Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1491: Change protocol naming convention and add block sync protocol #829

Merged
merged 3 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 22 additions & 6 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use aleph_primitives::AlephSessionApi;
use aleph_runtime::{self, opaque::Block, RuntimeApi, MAX_BLOCK_SIZE};
use finality_aleph::{
run_nonvalidator_node, run_validator_node, AlephBlockImport, AlephConfig,
JustificationNotification, Metrics, MillisecsPerBlock, Protocol, SessionPeriod,
JustificationNotification, Metrics, MillisecsPerBlock, Protocol, ProtocolNaming, SessionPeriod,
};
use futures::channel::mpsc;
use log::warn;
use sc_client_api::{Backend, HeaderBackend};
use sc_client_api::{Backend, BlockBackend, HeaderBackend};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_consensus_slots::BackoffAuthoringBlocksStrategy;
use sc_network::NetworkService;
Expand Down Expand Up @@ -212,14 +212,28 @@ fn setup(
(
RpcHandlers,
Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
ProtocolNaming,
NetworkStarter,
),
ServiceError,
> {
let genesis_hash = client
.block_hash(0)
.ok()
.flatten()
.expect("we should have a hash");
let chain_prefix = match config.chain_spec.fork_id() {
Some(fork_id) => format!("/{}/{}", genesis_hash, fork_id),
lesniak43 marked this conversation as resolved.
Show resolved Hide resolved
None => format!("/{}", genesis_hash),
};
let protocol_naming = ProtocolNaming::new(chain_prefix);
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Authentication));
.push(finality_aleph::peers_set_config(
maciejnems marked this conversation as resolved.
Show resolved Hide resolved
protocol_naming.clone(),
Protocol::Authentication,
));

let (network, system_rpc_tx, tx_handler_controller, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand Down Expand Up @@ -262,7 +276,7 @@ fn setup(
telemetry: telemetry.as_mut(),
})?;

Ok((rpc_handlers, network, network_starter))
Ok((rpc_handlers, network, protocol_naming, network_starter))
}

/// Builds a new service for a full client.
Expand Down Expand Up @@ -308,7 +322,7 @@ pub fn new_authority(
let backoff_authoring_blocks = Some(LimitNonfinalized(aleph_config.max_nonfinalized_blocks()));
let prometheus_registry = config.prometheus_registry().cloned();

let (_rpc_handlers, network, network_starter) = setup(
let (_rpc_handlers, network, protocol_naming, network_starter) = setup(
config,
backend.clone(),
&keystore_container,
Expand Down Expand Up @@ -383,6 +397,7 @@ pub fn new_authority(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
};
task_manager.spawn_essential_handle().spawn_blocking(
"aleph",
Expand Down Expand Up @@ -418,7 +433,7 @@ pub fn new_full(
.path(),
);

let (_rpc_handlers, network, network_starter) = setup(
let (_rpc_handlers, network, protocol_naming, network_starter) = setup(
config,
backend.clone(),
&keystore_container,
Expand Down Expand Up @@ -460,6 +475,7 @@ pub fn new_full(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
};

task_manager.spawn_essential_handle().spawn_blocking(
Expand Down
19 changes: 10 additions & 9 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::Duration;
use crate::{
abft::{CurrentNetworkData, LegacyNetworkData, CURRENT_VERSION, LEGACY_VERSION},
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
network::{data::split::Split, protocol_name},
network::data::split::Split,
session::{
first_block_of_session, last_block_of_session, session_id_from_block_num,
SessionBoundaries, SessionId,
Expand Down Expand Up @@ -51,7 +51,7 @@ pub use abft::{Keychain, NodeCount, NodeIndex, Recipient, SignatureSet, SpawnHan
pub use aleph_primitives::{AuthorityId, AuthorityPair, AuthoritySignature};
pub use import::AlephBlockImport;
pub use justification::{AlephJustification, JustificationNotification};
pub use network::Protocol;
pub use network::{Protocol, ProtocolNaming};
pub use nodes::{run_nonvalidator_node, run_validator_node};
pub use session::SessionPeriod;

Expand All @@ -67,21 +67,21 @@ enum Error {
}

/// Returns a NonDefaultSetConfig for the specified protocol.
pub fn peers_set_config(protocol: Protocol) -> sc_network_common::config::NonDefaultSetConfig {
let name = protocol_name(&protocol);

pub fn peers_set_config(
naming: ProtocolNaming,
protocol: Protocol,
) -> sc_network_common::config::NonDefaultSetConfig {
let mut config = sc_network_common::config::NonDefaultSetConfig::new(
name,
naming.protocol_name(&protocol),
// max_notification_size should be larger than the maximum possible honest message size (in bytes).
// Max size of alert is UNIT_SIZE * MAX_UNITS_IN_ALERT ~ 100 * 5000 = 50000 bytes
// Max size of parents response UNIT_SIZE * N_MEMBERS ~ 100 * N_MEMBERS
// When adding other (large) message types we need to make sure this limit is fine.
1024 * 1024,
);

config.set_config = match protocol {
Protocol::Authentication => sc_network_common::config::SetConfig::default(),
};
config.set_config = sc_network_common::config::SetConfig::default();
config.add_fallback_names(naming.fallback_protocol_names(&protocol));
config
}

Expand Down Expand Up @@ -254,6 +254,7 @@ pub struct AlephConfig<B: Block, H: ExHashT, C, SC, BB> {
pub backup_saving_path: Option<PathBuf>,
pub external_addresses: Vec<String>,
pub validator_port: u16,
pub protocol_naming: ProtocolNaming,
}

pub trait BlockchainBackend<B: Block> {
Expand Down
5 changes: 4 additions & 1 deletion finality-aleph/src/network/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ pub trait Network<D: Data>: Send + 'static {
async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error>;
}

/// The Authentication protocol is used for validator discovery.
/// Protocols used by the network.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum Protocol {
/// The authentication protocol is used for validator discovery.
Authentication,
/// The block synchronization protocol.
BlockSync,
}

/// Abstraction over a sender to the raw network.
Expand Down
60 changes: 47 additions & 13 deletions finality-aleph/src/network/gossip/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
pub struct Service<N: RawNetwork, D: Data> {
network: N,
messages_from_user: mpsc::UnboundedReceiver<Command<D, N::PeerId>>,
messages_for_user: mpsc::UnboundedSender<(D, N::PeerId)>,
messages_for_authentication_user: mpsc::UnboundedSender<(D, N::PeerId)>,
messages_for_block_sync_user: mpsc::UnboundedSender<(D, N::PeerId)>,
authentication_connected_peers: HashSet<N::PeerId>,
authentication_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<D>>,
block_sync_connected_peers: HashSet<N::PeerId>,
block_sync_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<D>>,
spawn_handle: SpawnTaskHandle,
}

struct ServiceInterface<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
protocol: Protocol,
messages_from_service: mpsc::UnboundedReceiver<(D, P)>,
messages_for_service: mpsc::UnboundedSender<Command<D, P>>,
}
Expand Down Expand Up @@ -70,7 +74,7 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv

fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Send(data, peer_id, Protocol::Authentication))
.unbounded_send(Command::Send(data, peer_id, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

Expand All @@ -80,17 +84,13 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv
peer_ids: HashSet<Self::PeerId>,
) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::SendToRandom(
data,
peer_ids,
Protocol::Authentication,
))
.unbounded_send(Command::SendToRandom(data, peer_ids, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

fn broadcast(&mut self, data: D) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Broadcast(data, Protocol::Authentication))
.unbounded_send(Command::Broadcast(data, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

Expand All @@ -115,20 +115,32 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
) -> (
Service<N, D>,
impl Network<D, Error = Error, PeerId = N::PeerId>,
impl Network<D, Error = Error, PeerId = N::PeerId>,
) {
let (messages_for_user, messages_from_service) = mpsc::unbounded();
let (messages_for_authentication_user, messages_from_authentication_service) =
mpsc::unbounded();
let (messages_for_block_sync_user, messages_from_block_sync_service) = mpsc::unbounded();
let (messages_for_service, messages_from_user) = mpsc::unbounded();
(
Service {
network,
messages_from_user,
messages_for_user,
messages_for_authentication_user,
messages_for_block_sync_user,
spawn_handle,
authentication_connected_peers: HashSet::new(),
authentication_peer_senders: HashMap::new(),
block_sync_connected_peers: HashSet::new(),
block_sync_peer_senders: HashMap::new(),
},
ServiceInterface {
protocol: Protocol::Authentication,
messages_from_service: messages_from_authentication_service,
messages_for_service: messages_for_service.clone(),
},
ServiceInterface {
messages_from_service,
protocol: Protocol::BlockSync,
messages_from_service: messages_from_block_sync_service,
messages_for_service,
},
)
Expand All @@ -141,6 +153,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
) -> Option<&mut TracingUnboundedSender<D>> {
match protocol {
Protocol::Authentication => self.authentication_peer_senders.get_mut(peer),
Protocol::BlockSync => self.block_sync_peer_senders.get_mut(peer),
}
}

Expand Down Expand Up @@ -211,6 +224,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
fn protocol_peers(&self, protocol: Protocol) -> &HashSet<N::PeerId> {
match protocol {
Protocol::Authentication => &self.authentication_connected_peers,
Protocol::BlockSync => &self.block_sync_connected_peers,
}
}

Expand Down Expand Up @@ -262,6 +276,12 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
self.authentication_peer_senders.insert(peer.clone(), tx);
rx
}
Protocol::BlockSync => {
let (tx, rx) = tracing_unbounded("mpsc_notification_stream_block_sync");
self.block_sync_connected_peers.insert(peer.clone());
maciejnems marked this conversation as resolved.
Show resolved Hide resolved
self.block_sync_peer_senders.insert(peer.clone(), tx);
rx
}
};
self.spawn_handle.spawn(
"aleph/network/peer_sender",
Expand All @@ -276,19 +296,33 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
self.authentication_connected_peers.remove(&peer);
self.authentication_peer_senders.remove(&peer);
}
Protocol::BlockSync => {
self.block_sync_connected_peers.remove(&peer);
self.block_sync_peer_senders.remove(&peer);
}
}
}
Messages(peer_id, messages) => {
for (protocol, data) in messages.into_iter() {
match protocol {
Protocol::Authentication => match D::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_user
.messages_for_authentication_user
.unbounded_send((data, peer_id.clone()))?,
Err(e) => {
warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e)
}
},
// This is a bit of a placeholder for now, as we are not yet using this
// protocol. In the future we will not be using the same D as above.
Protocol::BlockSync => match D::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_block_sync_user
.unbounded_send((data, peer_id.clone()))?,
Err(e) => {
warn!(target: "aleph-network", "Error decoding block sync protocol message: {}", e)
}
},
};
}
}
Expand Down Expand Up @@ -379,7 +413,7 @@ mod tests {

// Prepare service
let network = MockRawNetwork::new(event_stream_oneshot_tx);
let (service, gossip_network) =
let (service, gossip_network, _) =
Service::new(network.clone(), task_manager.spawn_handle());
let gossip_network = Box::new(gossip_network);

Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod tcp;
#[cfg(test)]
pub use gossip::mock::{MockEvent, MockRawNetwork};
pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService};
pub use substrate::protocol_name;
pub use substrate::{ProtocolNaming, SubstrateNetwork};

/// Represents the id of an arbitrary node.
pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send {
Expand Down