Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow custom gossipsub protocol id #2718

Merged
merged 10 commits into from Jul 2, 2022
13 changes: 9 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Expand Up @@ -54,7 +54,7 @@ use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::protocol::{SIGNING_PREFIX, ProtocolConfig};
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
use crate::time_cache::{DuplicateCache, TimeCache};
use crate::topic::{Hasher, Topic, TopicHash};
Expand Down Expand Up @@ -3056,12 +3056,17 @@ where
type OutEvent = GossipsubEvent;

fn new_handler(&mut self) -> Self::ConnectionHandler {
GossipsubHandler::new(
self.config.protocol_id_prefix().clone(),
let protocol_config = ProtocolConfig::new(
self.config.protocol_id().clone(),
self.config.custom_id_version().clone(),
self.config.max_transmit_size(),
self.config.validation_mode().clone(),
self.config.idle_timeout(),
self.config.support_floodsub(),
);

GossipsubHandler::new(
protocol_config,
self.config.idle_timeout(),
)
}

Expand Down
105 changes: 94 additions & 11 deletions protocols/gossipsub/src/config.rs
Expand Up @@ -49,10 +49,18 @@ pub enum ValidationMode {
None,
}

/// Selector for custom Protocol Id
#[derive(Clone, Debug, PartialEq)]
pub enum GossipsubVersion {
V1_0,
V1_1,
}

/// Configuration parameters that define the performance of the gossipsub network.
#[derive(Clone)]
pub struct GossipsubConfig {
protocol_id_prefix: Cow<'static, str>,
protocol_id: Cow<'static, str>,
custom_id_version: Option<GossipsubVersion>,
history_length: usize,
history_gossip: usize,
mesh_n: usize,
Expand Down Expand Up @@ -96,13 +104,20 @@ pub struct GossipsubConfig {
impl GossipsubConfig {
// All the getters

/// The protocol id prefix to negotiate this protocol. The protocol id is of the form
/// `/<prefix>/<supported-versions>`. As gossipsub supports version 1.0 and 1.1, there are two
/// protocol id's supported.
/// The protocol id to negotiate this protocol. By default, the resulting protocol id has the form
/// `/<prefix>/<supported-versions>`, but can optionally be changed to a literal form by providing some GossipsubVersion as custom_id_version.
/// As gossipsub supports version 1.0 and 1.1, there are two suffixes supported for the resulting protocol id.
///
/// Calling `GossipsubConfigBuilder::protocol_id_prefix` will set a new prefix and retain the prefix logic.
/// Calling `GossipsubConfigBuilder::protocol_id` will set a custom `protocol_id` and disable the prefix logic.
///
/// The default prefix is `meshsub`, giving the supported protocol ids: `/meshsub/1.1.0` and `/meshsub/1.0.0`, negotiated in that order.
pub fn protocol_id_prefix(&self) -> &Cow<'static, str> {
&self.protocol_id_prefix
pub fn protocol_id(&self) -> &Cow<'static, str> {
&self.protocol_id
}

pub fn custom_id_version(&self) -> &Option<GossipsubVersion> {
&self.custom_id_version
}

// Overlay network parameters.
Expand Down Expand Up @@ -394,7 +409,8 @@ impl Default for GossipsubConfigBuilder {
fn default() -> Self {
GossipsubConfigBuilder {
config: GossipsubConfig {
protocol_id_prefix: Cow::Borrowed("meshsub"),
protocol_id: Cow::Borrowed("meshsub"),
custom_id_version: None,
history_length: 5,
history_gossip: 3,
mesh_n: 6,
Expand Down Expand Up @@ -457,9 +473,17 @@ impl From<GossipsubConfig> for GossipsubConfigBuilder {
}

impl GossipsubConfigBuilder {
/// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`).
pub fn protocol_id_prefix(&mut self, protocol_id: impl Into<Cow<'static, str>>) -> &mut Self {
self.config.protocol_id_prefix = protocol_id.into();
/// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`).
pub fn protocol_id_prefix(&mut self, protocol_id_prefix: impl Into<Cow<'static, str>>) -> &mut Self {
self.config.custom_id_version = None;
self.config.protocol_id = protocol_id_prefix.into();
self
}

/// The full protocol id to negotiate this protocol (does not append `/1.0.0` or `/1.1.0`).
pub fn protocol_id(&mut self, protocol_id: impl Into<Cow<'static, str>>, custom_id_version: GossipsubVersion) -> &mut Self {
self.config.custom_id_version = Some(custom_id_version);
self.config.protocol_id = protocol_id.into();
self
}

Expand Down Expand Up @@ -810,7 +834,8 @@ impl GossipsubConfigBuilder {
impl std::fmt::Debug for GossipsubConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut builder = f.debug_struct("GossipsubConfig");
let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix);
let _ = builder.field("protocol_id", &self.protocol_id);
let _ = builder.field("custom_id_version", &self.custom_id_version);
let _ = builder.field("history_length", &self.history_length);
let _ = builder.field("history_gossip", &self.history_gossip);
let _ = builder.field("mesh_n", &self.mesh_n);
Expand Down Expand Up @@ -853,10 +878,14 @@ impl std::fmt::Debug for GossipsubConfig {
#[cfg(test)]
mod test {
use super::*;
use crate::{Gossipsub, MessageAuthenticity};
use crate::topic::IdentityHash;
use crate::Topic;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use libp2p_core::UpgradeInfo;
use libp2p_swarm::{ConnectionHandler, NetworkBehaviour};
use crate::types::PeerKind;

#[test]
fn create_thing() {
Expand Down Expand Up @@ -943,4 +972,58 @@ mod test {
let result = builder.message_id(&get_gossipsub_message());
assert_eq!(result, get_expected_message_id());
}

#[test]
fn create_config_with_protocol_id_prefix() {
bernardoaraujor marked this conversation as resolved.
Show resolved Hide resolved
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
.protocol_id_prefix("purple")
.validation_mode(ValidationMode::Anonymous)
.message_id_fn(message_id_plain_function)
.build()
.unwrap();

assert_eq!(builder.protocol_id(), "purple");
assert_eq!(builder.custom_id_version(), &None);

let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder)
.expect("Correct configuration");

let handler = gossipsub.new_handler();
let (protocol_config, _) = handler.listen_protocol().into_upgrade();
let protocol_ids = protocol_config.protocol_info();

assert_eq!(protocol_ids.len(), 2);

assert_eq!(protocol_ids[0].protocol_id, b"/purple/1.1.0".to_vec());
assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsubv1_1);

assert_eq!(protocol_ids[1].protocol_id, b"/purple/1.0.0".to_vec());
assert_eq!(protocol_ids[1].kind, PeerKind::Gossipsub);

}

#[test]
fn create_config_with_custom_protocol_id() {
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
.protocol_id("purple", GossipsubVersion::V1_0)
.validation_mode(ValidationMode::Anonymous)
.message_id_fn(message_id_plain_function)
.build()
.unwrap();

assert_eq!(builder.protocol_id(), "purple");
assert_eq!(builder.custom_id_version(), &Some(GossipsubVersion::V1_0));

let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder)
.expect("Correct configuration");

let handler = gossipsub.new_handler();
let (protocol_config, _) = handler.listen_protocol().into_upgrade();
let protocol_ids = protocol_config.protocol_info();

assert_eq!(protocol_ids.len(), 1);

assert_eq!(protocol_ids[0].protocol_id, b"purple".to_vec());
assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsub);
}
}
13 changes: 2 additions & 11 deletions protocols/gossipsub/src/handler.rs
Expand Up @@ -18,7 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::config::ValidationMode;
use crate::error::{GossipsubHandlerError, ValidationError};
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage};
Expand Down Expand Up @@ -164,20 +163,12 @@ enum OutboundSubstreamState {
impl GossipsubHandler {
/// Builds a new [`GossipsubHandler`].
pub fn new(
protocol_id_prefix: std::borrow::Cow<'static, str>,
max_transmit_size: usize,
validation_mode: ValidationMode,
protocol_config: ProtocolConfig,
idle_timeout: Duration,
support_floodsub: bool,
) -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(
ProtocolConfig::new(
protocol_id_prefix,
max_transmit_size,
validation_mode,
support_floodsub,
),
protocol_config,
(),
),
inbound_substream: None,
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/lib.rs
Expand Up @@ -160,7 +160,7 @@ mod rpc_proto;
pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity};
pub use self::transform::{DataTransform, IdentityTransform};

pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode};
pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubVersion, ValidationMode};
pub use self::peer_score::{
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
TopicScoreParams,
Expand Down
46 changes: 31 additions & 15 deletions protocols/gossipsub/src/protocol.rs
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::config::ValidationMode;
use crate::config::{GossipsubVersion, ValidationMode};
use crate::error::{GossipsubHandlerError, ValidationError};
use crate::handler::HandlerEvent;
use crate::rpc_proto;
Expand Down Expand Up @@ -59,21 +59,31 @@ impl ProtocolConfig {
///
/// Sets the maximum gossip transmission size.
pub fn new(
id_prefix: Cow<'static, str>,
id: Cow<'static, str>,
custom_id_peer_kind: Option<GossipsubVersion>,
max_transmit_size: usize,
validation_mode: ValidationMode,
support_floodsub: bool,
) -> ProtocolConfig {
// support version 1.1.0 and 1.0.0 with user-customized prefix
let mut protocol_ids = vec![
ProtocolId::new(id_prefix.clone(), PeerKind::Gossipsubv1_1),
ProtocolId::new(id_prefix, PeerKind::Gossipsub),
];

// add floodsub support if enabled.
if support_floodsub {
protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub));
}
let protocol_ids = match custom_id_peer_kind {
Some(v) => match v {
GossipsubVersion::V1_0 => vec![ProtocolId::new(id, PeerKind::Gossipsub, false)],
GossipsubVersion::V1_1 => vec![ProtocolId::new(id, PeerKind::Gossipsubv1_1, false)]
}
None => {
let mut protocol_ids = vec![
ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1, true),
ProtocolId::new(id, PeerKind::Gossipsub, true),
];

// add floodsub support if enabled.
if support_floodsub {
protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub, false));
}

protocol_ids
}
};

ProtocolConfig {
protocol_ids,
Expand All @@ -94,10 +104,16 @@ pub struct ProtocolId {

/// An RPC protocol ID.
impl ProtocolId {
pub fn new(prefix: Cow<'static, str>, kind: PeerKind) -> Self {
pub fn new(id: Cow<'static, str>, kind: PeerKind, prefix: bool) -> Self {
let protocol_id = match kind {
PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix, "1.1.0"),
PeerKind::Gossipsub => format!("/{}/{}", prefix, "1.0.0"),
PeerKind::Gossipsubv1_1 => match prefix {
true => format!("/{}/{}", id, "1.1.0"),
false => format!("{}", id),
},
PeerKind::Gossipsub => match prefix {
true => format!("/{}/{}", id, "1.0.0"),
false => format!("{}", id),
},
PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"),
// NOTE: This is used for informing the behaviour of unsupported peers. We do not
// advertise this variant.
Expand Down