From 0d9bf4f2e785cf09042b3e147d18a5c7462929ee Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Mon, 20 Jun 2022 19:01:14 -0300 Subject: [PATCH 1/8] allow custom gossipsub protocol id --- protocols/gossipsub/src/behaviour.rs | 23 ++++++++++++++------ protocols/gossipsub/src/config.rs | 29 ++++++++++++++++++++++--- protocols/gossipsub/src/handler.rs | 32 ++++++++++++++++++++++++++++ protocols/gossipsub/src/protocol.rs | 23 +++++++++++++++++--- protocols/gossipsub/src/types.rs | 3 +++ 5 files changed, 97 insertions(+), 13 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index ea8bd73b89a..ec99621714f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3056,13 +3056,22 @@ where type OutEvent = GossipsubEvent; fn new_handler(&mut self) -> Self::ConnectionHandler { - GossipsubHandler::new( - self.config.protocol_id_prefix().clone(), - self.config.max_transmit_size(), - self.config.validation_mode().clone(), - self.config.idle_timeout(), - self.config.support_floodsub(), - ) + if self.config.support_custom() { + GossipsubHandler::new_custom( + self.config.custom_protocol_id().clone(), + self.config.max_transmit_size(), + self.config.validation_mode().clone(), + self.config.idle_timeout(), + ) + } else { + GossipsubHandler::new( + self.config.protocol_id_prefix().clone(), + self.config.max_transmit_size(), + self.config.validation_mode().clone(), + self.config.idle_timeout(), + self.config.support_floodsub(), + ) + } } fn inject_connection_established( diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index e2758cd45da..ee700b252d2 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -53,6 +53,7 @@ pub enum ValidationMode { #[derive(Clone)] pub struct GossipsubConfig { protocol_id_prefix: Cow<'static, str>, + custom_protocol_id: Cow<'static, str>, history_length: usize, history_gossip: usize, mesh_n: usize, @@ -90,6 +91,7 @@ pub struct GossipsubConfig { max_ihave_messages: usize, iwant_followup_time: Duration, support_floodsub: bool, + support_custom: bool, published_message_ids_cache_time: Duration, } @@ -105,6 +107,11 @@ impl GossipsubConfig { &self.protocol_id_prefix } + /// The custom protocol id. It is optional and will replace anything that is set on protocol_id_prefix. + pub fn custom_protocol_id(&self) -> &Cow<'static, str> { + &self.custom_protocol_id + } + // Overlay network parameters. /// Number of heartbeats to keep in the `memcache` (default is 5). pub fn history_length(&self) -> usize { @@ -370,6 +377,11 @@ impl GossipsubConfig { self.support_floodsub } + /// Enable support for custom Gossipsub Protocol IDs. Default false. + pub fn support_custom(&self) -> bool { + self.support_custom + } + /// Published message ids time cache duration. The default is 10 seconds. pub fn published_message_ids_cache_time(&self) -> Duration { self.published_message_ids_cache_time @@ -395,6 +407,7 @@ impl Default for GossipsubConfigBuilder { GossipsubConfigBuilder { config: GossipsubConfig { protocol_id_prefix: Cow::Borrowed("meshsub"), + custom_protocol_id: Cow::Borrowed(""), history_length: 5, history_gossip: 3, mesh_n: 6, @@ -444,6 +457,7 @@ impl Default for GossipsubConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), support_floodsub: false, + support_custom: false, published_message_ids_cache_time: Duration::from_secs(10), }, } @@ -457,9 +471,16 @@ impl From for GossipsubConfigBuilder { } impl GossipsubConfigBuilder { - /// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`). - pub fn protocol_id_prefix(&mut self, protocol_id: impl Into>) -> &mut Self { - self.config.protocol_id_prefix = protocol_id.into(); + /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`). + pub fn protocol_id_prefix(&mut self, protocol_id_prefix: impl Into>) -> &mut Self { + self.config.protocol_id_prefix = protocol_id_prefix.into(); + self + } + + /// The custom protocol id to negotiate this protocol (overrides any prefix set via `protocol_id_prefix`) + pub fn protocol_id(&mut self, protocol_id: impl Into>) -> &mut Self { + self.config.support_custom = true; + self.config.custom_protocol_id = protocol_id.into(); self } @@ -811,6 +832,7 @@ impl std::fmt::Debug for GossipsubConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix); + let _ = builder.field("custom_protocol_id", &self.custom_protocol_id); let _ = builder.field("history_length", &self.history_length); let _ = builder.field("history_gossip", &self.history_gossip); let _ = builder.field("mesh_n", &self.mesh_n); @@ -842,6 +864,7 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("max_ihave_messages", &self.max_ihave_messages); let _ = builder.field("iwant_followup_time", &self.iwant_followup_time); let _ = builder.field("support_floodsub", &self.support_floodsub); + let _ = builder.field("support_custom", &self.support_custom); let _ = builder.field( "published_message_ids_cache_time", &self.published_message_ids_cache_time, diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index c0034f09712..040586d98f5 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -195,6 +195,38 @@ impl GossipsubHandler { in_mesh: false, } } + + /// Builds a new [`GossipsubHandler`] with a custom Protocol ID + pub fn new_custom( + protocol_id: std::borrow::Cow<'static, str>, + max_transmit_size: usize, + validation_mode: ValidationMode, + idle_timeout: Duration, + ) -> Self { + GossipsubHandler { + listen_protocol: SubstreamProtocol::new( + ProtocolConfig::new_custom( + protocol_id, + max_transmit_size, + validation_mode, + ), + (), + ), + inbound_substream: None, + outbound_substream: None, + outbound_substream_establishing: false, + outbound_substreams_created: 0, + inbound_substreams_created: 0, + send_queue: SmallVec::new(), + peer_kind: None, + peer_kind_sent: false, + protocol_unsupported: false, + idle_timeout, + upgrade_errors: VecDeque::new(), + keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), + in_mesh: false, + } + } } impl ConnectionHandler for GossipsubHandler { diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 72c985f2534..659e2ade6b3 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -81,6 +81,22 @@ impl ProtocolConfig { validation_mode, } } + + pub fn new_custom( + protocol_id: Cow<'static, str>, + max_transmit_size: usize, + validation_mode: ValidationMode + ) -> ProtocolConfig { + let protocol_ids = vec![ + ProtocolId::new(protocol_id, PeerKind::Custom), + ]; + + ProtocolConfig { + protocol_ids, + max_transmit_size, + validation_mode, + } + } } /// The protocol ID @@ -94,11 +110,12 @@ pub struct ProtocolId { /// An RPC protocol ID. impl ProtocolId { - pub fn new(prefix: Cow<'static, str>, kind: PeerKind) -> Self { + pub fn new(prefix_or_custom: Cow<'static, str>, kind: PeerKind) -> Self { let protocol_id = match kind { - PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix, "1.1.0"), - PeerKind::Gossipsub => format!("/{}/{}", prefix, "1.0.0"), + PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix_or_custom, "1.1.0"), + PeerKind::Gossipsub => format!("/{}/{}", prefix_or_custom, "1.0.0"), PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"), + PeerKind::Custom => format!("{}", prefix_or_custom), // NOTE: This is used for informing the behaviour of unsupported peers. We do not // advertise this variant. PeerKind::NotSupported => unreachable!("Should never advertise NotSupported"), diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 6ffde514e37..41506b257a4 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -103,6 +103,8 @@ pub enum PeerKind { Gossipsub, /// A floodsub peer. Floodsub, + /// A custom gossipsub peer. + Custom, /// The peer doesn't support any of the protocols. NotSupported, } @@ -372,6 +374,7 @@ impl PeerKind { pub fn as_static_ref(&self) -> &'static str { match self { Self::NotSupported => "Not Supported", + Self::Custom => "Custom", Self::Floodsub => "Floodsub", Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", From 1440425f8b199366a94b8726834a41e0fc9ef5cf Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Sat, 25 Jun 2022 16:36:58 -0300 Subject: [PATCH 2/8] pass config to new_handler --- protocols/gossipsub/src/behaviour.rs | 33 ++++++++++---------- protocols/gossipsub/src/handler.rs | 45 ++-------------------------- protocols/gossipsub/src/protocol.rs | 45 +++++++++++----------------- 3 files changed, 36 insertions(+), 87 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index ec99621714f..ae5df864491 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -54,7 +54,7 @@ use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; -use crate::protocol::SIGNING_PREFIX; +use crate::protocol::{SIGNING_PREFIX, ProtocolConfig}; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; use crate::time_cache::{DuplicateCache, TimeCache}; use crate::topic::{Hasher, Topic, TopicHash}; @@ -3056,22 +3056,21 @@ where type OutEvent = GossipsubEvent; fn new_handler(&mut self) -> Self::ConnectionHandler { - if self.config.support_custom() { - GossipsubHandler::new_custom( - self.config.custom_protocol_id().clone(), - self.config.max_transmit_size(), - self.config.validation_mode().clone(), - self.config.idle_timeout(), - ) - } else { - GossipsubHandler::new( - self.config.protocol_id_prefix().clone(), - self.config.max_transmit_size(), - self.config.validation_mode().clone(), - self.config.idle_timeout(), - self.config.support_floodsub(), - ) - } + let protocol_config = ProtocolConfig::new( + match self.config.support_custom() { + true => self.config.custom_protocol_id().clone(), + false => self.config.protocol_id_prefix().clone(), + }, + self.config.max_transmit_size(), + self.config.validation_mode().clone(), + self.config.support_floodsub(), + self.config.support_custom(), + ); + + GossipsubHandler::new( + protocol_config, + self.config.idle_timeout(), + ) } fn inject_connection_established( diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 040586d98f5..77fa35f0a96 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::ValidationMode; use crate::error::{GossipsubHandlerError, ValidationError}; use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage}; @@ -164,52 +163,12 @@ enum OutboundSubstreamState { impl GossipsubHandler { /// Builds a new [`GossipsubHandler`]. pub fn new( - protocol_id_prefix: std::borrow::Cow<'static, str>, - max_transmit_size: usize, - validation_mode: ValidationMode, + protocol_config: ProtocolConfig, idle_timeout: Duration, - support_floodsub: bool, ) -> Self { GossipsubHandler { listen_protocol: SubstreamProtocol::new( - ProtocolConfig::new( - protocol_id_prefix, - max_transmit_size, - validation_mode, - support_floodsub, - ), - (), - ), - inbound_substream: None, - outbound_substream: None, - outbound_substream_establishing: false, - outbound_substreams_created: 0, - inbound_substreams_created: 0, - send_queue: SmallVec::new(), - peer_kind: None, - peer_kind_sent: false, - protocol_unsupported: false, - idle_timeout, - upgrade_errors: VecDeque::new(), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), - in_mesh: false, - } - } - - /// Builds a new [`GossipsubHandler`] with a custom Protocol ID - pub fn new_custom( - protocol_id: std::borrow::Cow<'static, str>, - max_transmit_size: usize, - validation_mode: ValidationMode, - idle_timeout: Duration, - ) -> Self { - GossipsubHandler { - listen_protocol: SubstreamProtocol::new( - ProtocolConfig::new_custom( - protocol_id, - max_transmit_size, - validation_mode, - ), + protocol_config, (), ), inbound_substream: None, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 659e2ade6b3..c25b348791b 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -59,37 +59,28 @@ impl ProtocolConfig { /// /// Sets the maximum gossip transmission size. pub fn new( - id_prefix: Cow<'static, str>, + id: Cow<'static, str>, max_transmit_size: usize, validation_mode: ValidationMode, support_floodsub: bool, + support_custom: bool, ) -> ProtocolConfig { - // support version 1.1.0 and 1.0.0 with user-customized prefix - let mut protocol_ids = vec![ - ProtocolId::new(id_prefix.clone(), PeerKind::Gossipsubv1_1), - ProtocolId::new(id_prefix, PeerKind::Gossipsub), - ]; - - // add floodsub support if enabled. - if support_floodsub { - protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub)); - } - - ProtocolConfig { - protocol_ids, - max_transmit_size, - validation_mode, - } - } - - pub fn new_custom( - protocol_id: Cow<'static, str>, - max_transmit_size: usize, - validation_mode: ValidationMode - ) -> ProtocolConfig { - let protocol_ids = vec![ - ProtocolId::new(protocol_id, PeerKind::Custom), - ]; + let protocol_ids = match support_custom { + true => vec![ProtocolId::new(id, PeerKind::Custom)], + false => { + // support version 1.1.0 and 1.0.0 with user-customized prefix + let mut protocol_ids = vec![ + ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1), + ProtocolId::new(id, PeerKind::Gossipsub), + ]; + + // add floodsub support if enabled. + if support_floodsub { + protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub)); + } + protocol_ids + } + }; ProtocolConfig { protocol_ids, From 0726aaea0b8fb4a8e9f18c19116f261ae7bfec78 Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Sun, 26 Jun 2022 18:50:58 -0300 Subject: [PATCH 3/8] drop PeerKind::Custom; rename protocol_id; introduce protocol_id_prefix flag --- protocols/gossipsub/src/behaviour.rs | 7 +-- protocols/gossipsub/src/config.rs | 70 ++++++++++++++++++---------- protocols/gossipsub/src/protocol.rs | 40 ++++++++-------- protocols/gossipsub/src/types.rs | 3 -- 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index ae5df864491..c8c96a94d1a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3057,14 +3057,11 @@ where fn new_handler(&mut self) -> Self::ConnectionHandler { let protocol_config = ProtocolConfig::new( - match self.config.support_custom() { - true => self.config.custom_protocol_id().clone(), - false => self.config.protocol_id_prefix().clone(), - }, + self.config.protocol_id().clone(), self.config.max_transmit_size(), self.config.validation_mode().clone(), self.config.support_floodsub(), - self.config.support_custom(), + self.config.protocol_id_is_prefix(), ); GossipsubHandler::new( diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index ee700b252d2..3807ae7214d 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -52,8 +52,8 @@ pub enum ValidationMode { /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] pub struct GossipsubConfig { - protocol_id_prefix: Cow<'static, str>, - custom_protocol_id: Cow<'static, str>, + protocol_id_prefix: bool, + protocol_id: Cow<'static, str>, history_length: usize, history_gossip: usize, mesh_n: usize, @@ -91,25 +91,22 @@ pub struct GossipsubConfig { max_ihave_messages: usize, iwant_followup_time: Duration, support_floodsub: bool, - support_custom: bool, published_message_ids_cache_time: Duration, } impl GossipsubConfig { // All the getters - /// The protocol id prefix to negotiate this protocol. The protocol id is of the form - /// `//`. As gossipsub supports version 1.0 and 1.1, there are two - /// protocol id's supported. + /// The protocol id to negotiate this protocol. By default, the resulting protocol id has the form + /// `//`, but can optionally be changed to a literal form by disabling the `protocol_id_prefix` flag. + /// As gossipsub supports version 1.0 and 1.1, there are two suffixes supported for the resulting protocol id. + /// + /// Calling `GossipsubConfigBuilder::protocol_id_prefix` will set a new prefix and retain the prefix logic. + /// Calling `GossipsubConfigBuilder::protocol_id` will set a custom `protocol_id` and disable the prefix logic. /// /// The default prefix is `meshsub`, giving the supported protocol ids: `/meshsub/1.1.0` and `/meshsub/1.0.0`, negotiated in that order. - pub fn protocol_id_prefix(&self) -> &Cow<'static, str> { - &self.protocol_id_prefix - } - - /// The custom protocol id. It is optional and will replace anything that is set on protocol_id_prefix. - pub fn custom_protocol_id(&self) -> &Cow<'static, str> { - &self.custom_protocol_id + pub fn protocol_id(&self) -> &Cow<'static, str> { + &self.protocol_id } // Overlay network parameters. @@ -377,9 +374,9 @@ impl GossipsubConfig { self.support_floodsub } - /// Enable support for custom Gossipsub Protocol IDs. Default false. - pub fn support_custom(&self) -> bool { - self.support_custom + /// Gossipsub Protocol ID follows a prefix logic. Default true. + pub fn protocol_id_is_prefix(&self) -> bool { + self.protocol_id_prefix } /// Published message ids time cache duration. The default is 10 seconds. @@ -406,8 +403,8 @@ impl Default for GossipsubConfigBuilder { fn default() -> Self { GossipsubConfigBuilder { config: GossipsubConfig { - protocol_id_prefix: Cow::Borrowed("meshsub"), - custom_protocol_id: Cow::Borrowed(""), + protocol_id_prefix: true, + protocol_id: Cow::Borrowed("meshsub"), history_length: 5, history_gossip: 3, mesh_n: 6, @@ -457,7 +454,6 @@ impl Default for GossipsubConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), support_floodsub: false, - support_custom: false, published_message_ids_cache_time: Duration::from_secs(10), }, } @@ -473,14 +469,15 @@ impl From for GossipsubConfigBuilder { impl GossipsubConfigBuilder { /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`). pub fn protocol_id_prefix(&mut self, protocol_id_prefix: impl Into>) -> &mut Self { - self.config.protocol_id_prefix = protocol_id_prefix.into(); + self.config.protocol_id_prefix = true; + self.config.protocol_id = protocol_id_prefix.into(); self } - /// The custom protocol id to negotiate this protocol (overrides any prefix set via `protocol_id_prefix`) + /// The full protocol id to negotiate this protocol (does not append `/1.0.0` or `/1.1.0`). pub fn protocol_id(&mut self, protocol_id: impl Into>) -> &mut Self { - self.config.support_custom = true; - self.config.custom_protocol_id = protocol_id.into(); + self.config.protocol_id_prefix = false; + self.config.protocol_id = protocol_id.into(); self } @@ -832,7 +829,7 @@ impl std::fmt::Debug for GossipsubConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix); - let _ = builder.field("custom_protocol_id", &self.custom_protocol_id); + let _ = builder.field("protocol_id", &self.protocol_id); let _ = builder.field("history_length", &self.history_length); let _ = builder.field("history_gossip", &self.history_gossip); let _ = builder.field("mesh_n", &self.mesh_n); @@ -864,7 +861,6 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("max_ihave_messages", &self.max_ihave_messages); let _ = builder.field("iwant_followup_time", &self.iwant_followup_time); let _ = builder.field("support_floodsub", &self.support_floodsub); - let _ = builder.field("support_custom", &self.support_custom); let _ = builder.field( "published_message_ids_cache_time", &self.published_message_ids_cache_time, @@ -966,4 +962,28 @@ mod test { let result = builder.message_id(&get_gossipsub_message()); assert_eq!(result, get_expected_message_id()); } + + #[test] + fn create_config_with_protocol_id_prefix() { + let builder: GossipsubConfig = GossipsubConfigBuilder::default() + .protocol_id_prefix("purple") + .message_id_fn(message_id_plain_function) + .build() + .unwrap(); + + assert_eq!(builder.protocol_id(), "purple"); + assert_eq!(builder.protocol_id_prefix, true); + } + + #[test] + fn create_config_with_custom_protocol_id() { + let builder: GossipsubConfig = GossipsubConfigBuilder::default() + .protocol_id("purple") + .message_id_fn(message_id_plain_function) + .build() + .unwrap(); + + assert_eq!(builder.protocol_id(), "purple"); + assert_eq!(builder.protocol_id_prefix, false); + } } diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index c25b348791b..4e4b4b72b34 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -63,24 +63,17 @@ impl ProtocolConfig { max_transmit_size: usize, validation_mode: ValidationMode, support_floodsub: bool, - support_custom: bool, + id_is_prefix: bool, ) -> ProtocolConfig { - let protocol_ids = match support_custom { - true => vec![ProtocolId::new(id, PeerKind::Custom)], - false => { - // support version 1.1.0 and 1.0.0 with user-customized prefix - let mut protocol_ids = vec![ - ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1), - ProtocolId::new(id, PeerKind::Gossipsub), - ]; - - // add floodsub support if enabled. - if support_floodsub { - protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub)); - } - protocol_ids - } - }; + let mut protocol_ids = vec![ + ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1, id_is_prefix), + ProtocolId::new(id, PeerKind::Gossipsub, id_is_prefix), + ]; + + // add floodsub support if enabled. + if support_floodsub { + protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub, false)); + } ProtocolConfig { protocol_ids, @@ -101,12 +94,17 @@ pub struct ProtocolId { /// An RPC protocol ID. impl ProtocolId { - pub fn new(prefix_or_custom: Cow<'static, str>, kind: PeerKind) -> Self { + pub fn new(id: Cow<'static, str>, kind: PeerKind, prefix: bool) -> Self { let protocol_id = match kind { - PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix_or_custom, "1.1.0"), - PeerKind::Gossipsub => format!("/{}/{}", prefix_or_custom, "1.0.0"), + PeerKind::Gossipsubv1_1 => match prefix { + true => format!("/{}/{}", id, "1.1.0"), + false => format!("{}", id), + }, + PeerKind::Gossipsub => match prefix { + true => format!("/{}/{}", id, "1.0.0"), + false => format!("{}", id), + }, PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"), - PeerKind::Custom => format!("{}", prefix_or_custom), // NOTE: This is used for informing the behaviour of unsupported peers. We do not // advertise this variant. PeerKind::NotSupported => unreachable!("Should never advertise NotSupported"), diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 41506b257a4..6ffde514e37 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -103,8 +103,6 @@ pub enum PeerKind { Gossipsub, /// A floodsub peer. Floodsub, - /// A custom gossipsub peer. - Custom, /// The peer doesn't support any of the protocols. NotSupported, } @@ -374,7 +372,6 @@ impl PeerKind { pub fn as_static_ref(&self) -> &'static str { match self { Self::NotSupported => "Not Supported", - Self::Custom => "Custom", Self::Floodsub => "Floodsub", Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", From 97749f070eef1700041235082d083f75c85e9e2a Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Wed, 29 Jun 2022 13:25:20 -0300 Subject: [PATCH 4/8] introduce GossipsubVersion enum --- protocols/gossipsub/src/behaviour.rs | 2 +- protocols/gossipsub/src/config.rs | 36 ++++++++++++++++------------ protocols/gossipsub/src/lib.rs | 2 +- protocols/gossipsub/src/protocol.rs | 32 ++++++++++++++++--------- 4 files changed, 44 insertions(+), 28 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c8c96a94d1a..7994248c1bc 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3058,10 +3058,10 @@ where fn new_handler(&mut self) -> Self::ConnectionHandler { let protocol_config = ProtocolConfig::new( self.config.protocol_id().clone(), + self.config.custom_id_version().clone(), self.config.max_transmit_size(), self.config.validation_mode().clone(), self.config.support_floodsub(), - self.config.protocol_id_is_prefix(), ); GossipsubHandler::new( diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 3807ae7214d..1d0a41ef278 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -49,11 +49,18 @@ pub enum ValidationMode { None, } +/// Selector for custom Protocol Id +#[derive(Clone, Debug, PartialEq)] +pub enum GossipsubVersion { + V1_0, + V1_1, +} + /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] pub struct GossipsubConfig { - protocol_id_prefix: bool, protocol_id: Cow<'static, str>, + custom_id_version: Option, history_length: usize, history_gossip: usize, mesh_n: usize, @@ -98,7 +105,7 @@ impl GossipsubConfig { // All the getters /// The protocol id to negotiate this protocol. By default, the resulting protocol id has the form - /// `//`, but can optionally be changed to a literal form by disabling the `protocol_id_prefix` flag. + /// `//`, but can optionally be changed to a literal form by providing some GossipsubVersion as custom_id_version. /// As gossipsub supports version 1.0 and 1.1, there are two suffixes supported for the resulting protocol id. /// /// Calling `GossipsubConfigBuilder::protocol_id_prefix` will set a new prefix and retain the prefix logic. @@ -109,6 +116,10 @@ impl GossipsubConfig { &self.protocol_id } + pub fn custom_id_version(&self) -> &Option { + &self.custom_id_version + } + // Overlay network parameters. /// Number of heartbeats to keep in the `memcache` (default is 5). pub fn history_length(&self) -> usize { @@ -374,11 +385,6 @@ impl GossipsubConfig { self.support_floodsub } - /// Gossipsub Protocol ID follows a prefix logic. Default true. - pub fn protocol_id_is_prefix(&self) -> bool { - self.protocol_id_prefix - } - /// Published message ids time cache duration. The default is 10 seconds. pub fn published_message_ids_cache_time(&self) -> Duration { self.published_message_ids_cache_time @@ -403,8 +409,8 @@ impl Default for GossipsubConfigBuilder { fn default() -> Self { GossipsubConfigBuilder { config: GossipsubConfig { - protocol_id_prefix: true, protocol_id: Cow::Borrowed("meshsub"), + custom_id_version: None, history_length: 5, history_gossip: 3, mesh_n: 6, @@ -469,14 +475,14 @@ impl From for GossipsubConfigBuilder { impl GossipsubConfigBuilder { /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`). pub fn protocol_id_prefix(&mut self, protocol_id_prefix: impl Into>) -> &mut Self { - self.config.protocol_id_prefix = true; + self.config.custom_id_version = None; self.config.protocol_id = protocol_id_prefix.into(); self } /// The full protocol id to negotiate this protocol (does not append `/1.0.0` or `/1.1.0`). - pub fn protocol_id(&mut self, protocol_id: impl Into>) -> &mut Self { - self.config.protocol_id_prefix = false; + pub fn protocol_id(&mut self, protocol_id: impl Into>, custom_id_version: GossipsubVersion) -> &mut Self { + self.config.custom_id_version = Some(custom_id_version); self.config.protocol_id = protocol_id.into(); self } @@ -828,8 +834,8 @@ impl GossipsubConfigBuilder { impl std::fmt::Debug for GossipsubConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); - let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix); let _ = builder.field("protocol_id", &self.protocol_id); + let _ = builder.field("custom_id_version", &self.custom_id_version); let _ = builder.field("history_length", &self.history_length); let _ = builder.field("history_gossip", &self.history_gossip); let _ = builder.field("mesh_n", &self.mesh_n); @@ -972,18 +978,18 @@ mod test { .unwrap(); assert_eq!(builder.protocol_id(), "purple"); - assert_eq!(builder.protocol_id_prefix, true); + assert_eq!(builder.custom_id_version(), &None); } #[test] fn create_config_with_custom_protocol_id() { let builder: GossipsubConfig = GossipsubConfigBuilder::default() - .protocol_id("purple") + .protocol_id("purple", GossipsubVersion::V1_0) .message_id_fn(message_id_plain_function) .build() .unwrap(); assert_eq!(builder.protocol_id(), "purple"); - assert_eq!(builder.protocol_id_prefix, false); + assert_eq!(builder.custom_id_version(), &Some(GossipsubVersion::V1_0)); } } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 0168a0a2842..d86263aace4 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -160,7 +160,7 @@ mod rpc_proto; pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity}; pub use self::transform::{DataTransform, IdentityTransform}; -pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode}; +pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubVersion, ValidationMode}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 4e4b4b72b34..c0ebe72a031 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::ValidationMode; +use crate::config::{GossipsubVersion, ValidationMode}; use crate::error::{GossipsubHandlerError, ValidationError}; use crate::handler::HandlerEvent; use crate::rpc_proto; @@ -60,20 +60,30 @@ impl ProtocolConfig { /// Sets the maximum gossip transmission size. pub fn new( id: Cow<'static, str>, + custom_id_peer_kind: Option, max_transmit_size: usize, validation_mode: ValidationMode, support_floodsub: bool, - id_is_prefix: bool, ) -> ProtocolConfig { - let mut protocol_ids = vec![ - ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1, id_is_prefix), - ProtocolId::new(id, PeerKind::Gossipsub, id_is_prefix), - ]; - - // add floodsub support if enabled. - if support_floodsub { - protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub, false)); - } + let protocol_ids = match custom_id_peer_kind { + Some(v) => match v { + GossipsubVersion::V1_0 => vec![ProtocolId::new(id, PeerKind::Gossipsub, false)], + GossipsubVersion::V1_1 => vec![ProtocolId::new(id, PeerKind::Gossipsubv1_1, false)] + } + None => { + let mut protocol_ids = vec![ + ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1, true), + ProtocolId::new(id, PeerKind::Gossipsub, true), + ]; + + // add floodsub support if enabled. + if support_floodsub { + protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub, false)); + } + + protocol_ids + } + }; ProtocolConfig { protocol_ids, From c7ef6fa182c7f3a92f9b87d44cb9662302b4083c Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Thu, 30 Jun 2022 11:59:40 -0300 Subject: [PATCH 5/8] test serialised protocol ids --- protocols/gossipsub/src/config.rs | 34 +++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 1d0a41ef278..a97782d77ca 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -878,10 +878,14 @@ impl std::fmt::Debug for GossipsubConfig { #[cfg(test)] mod test { use super::*; + use crate::{Gossipsub, MessageAuthenticity}; use crate::topic::IdentityHash; use crate::Topic; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; + use libp2p_core::UpgradeInfo; + use libp2p_swarm::{ConnectionHandler, NetworkBehaviour}; + use crate::types::PeerKind; #[test] fn create_thing() { @@ -973,23 +977,53 @@ mod test { fn create_config_with_protocol_id_prefix() { let builder: GossipsubConfig = GossipsubConfigBuilder::default() .protocol_id_prefix("purple") + .validation_mode(ValidationMode::Anonymous) .message_id_fn(message_id_plain_function) .build() .unwrap(); assert_eq!(builder.protocol_id(), "purple"); assert_eq!(builder.custom_id_version(), &None); + + let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder) + .expect("Correct configuration"); + + let handler = gossipsub.new_handler(); + let (protocol_config, _) = handler.listen_protocol().into_upgrade(); + let protocol_ids = protocol_config.protocol_info(); + + assert_eq!(protocol_ids.len(), 2); + + assert_eq!(protocol_ids[0].protocol_id, b"/purple/1.1.0".to_vec()); + assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsubv1_1); + + assert_eq!(protocol_ids[1].protocol_id, b"/purple/1.0.0".to_vec()); + assert_eq!(protocol_ids[1].kind, PeerKind::Gossipsub); + } #[test] fn create_config_with_custom_protocol_id() { let builder: GossipsubConfig = GossipsubConfigBuilder::default() .protocol_id("purple", GossipsubVersion::V1_0) + .validation_mode(ValidationMode::Anonymous) .message_id_fn(message_id_plain_function) .build() .unwrap(); assert_eq!(builder.protocol_id(), "purple"); assert_eq!(builder.custom_id_version(), &Some(GossipsubVersion::V1_0)); + + let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder) + .expect("Correct configuration"); + + let handler = gossipsub.new_handler(); + let (protocol_config, _) = handler.listen_protocol().into_upgrade(); + let protocol_ids = protocol_config.protocol_info(); + + assert_eq!(protocol_ids.len(), 1); + + assert_eq!(protocol_ids[0].protocol_id, b"purple".to_vec()); + assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsub); } } From 3348f9384d357d8ec530053f848f44513371d497 Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Fri, 1 Jul 2022 15:44:11 -0300 Subject: [PATCH 6/8] note new protocol_id function to changelog --- protocols/gossipsub/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 09dec97c435..c07e4163370 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -4,6 +4,8 @@ - Update to `libp2p-swarm` `v0.37.0`. +- Allow for custom protocol ID via `GossipsubConfigBuilder::protocol_id()`. See [PR 2718]. + # 0.38.1 - Fix duplicate connection id. See [PR 2702]. From c9eee350484d7a01675b3b88dd4b780e1ca60a78 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 2 Jul 2022 03:10:13 +0200 Subject: [PATCH 7/8] Update protocols/gossipsub/CHANGELOG.md --- protocols/gossipsub/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index c07e4163370..f8c6d9e4ada 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -6,6 +6,8 @@ - Allow for custom protocol ID via `GossipsubConfigBuilder::protocol_id()`. See [PR 2718]. +[PR 2718]: https://github.com/libp2p/rust-libp2p/pull/2718/ + # 0.38.1 - Fix duplicate connection id. See [PR 2702]. From 6b3450d446ebb3fdd13a67c97afdc5d8ef534c99 Mon Sep 17 00:00:00 2001 From: "Bernardo A. Rodrigues" Date: Fri, 1 Jul 2022 22:33:26 -0300 Subject: [PATCH 8/8] rustfmt --- protocols/gossipsub/src/behaviour.rs | 7 ++----- protocols/gossipsub/src/config.rs | 28 +++++++++++++++++----------- protocols/gossipsub/src/handler.rs | 10 ++-------- protocols/gossipsub/src/protocol.rs | 4 ++-- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index fe7a86e4cb7..3ec0b117d58 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -54,7 +54,7 @@ use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; -use crate::protocol::{SIGNING_PREFIX, ProtocolConfig}; +use crate::protocol::{ProtocolConfig, SIGNING_PREFIX}; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; use crate::time_cache::{DuplicateCache, TimeCache}; use crate::topic::{Hasher, Topic, TopicHash}; @@ -3061,10 +3061,7 @@ where self.config.support_floodsub(), ); - GossipsubHandler::new( - protocol_config, - self.config.idle_timeout(), - ) + GossipsubHandler::new(protocol_config, self.config.idle_timeout()) } fn inject_connection_established( diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index a97782d77ca..93939757c63 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -474,14 +474,21 @@ impl From for GossipsubConfigBuilder { impl GossipsubConfigBuilder { /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`). - pub fn protocol_id_prefix(&mut self, protocol_id_prefix: impl Into>) -> &mut Self { + pub fn protocol_id_prefix( + &mut self, + protocol_id_prefix: impl Into>, + ) -> &mut Self { self.config.custom_id_version = None; self.config.protocol_id = protocol_id_prefix.into(); self } /// The full protocol id to negotiate this protocol (does not append `/1.0.0` or `/1.1.0`). - pub fn protocol_id(&mut self, protocol_id: impl Into>, custom_id_version: GossipsubVersion) -> &mut Self { + pub fn protocol_id( + &mut self, + protocol_id: impl Into>, + custom_id_version: GossipsubVersion, + ) -> &mut Self { self.config.custom_id_version = Some(custom_id_version); self.config.protocol_id = protocol_id.into(); self @@ -878,14 +885,14 @@ impl std::fmt::Debug for GossipsubConfig { #[cfg(test)] mod test { use super::*; - use crate::{Gossipsub, MessageAuthenticity}; use crate::topic::IdentityHash; + use crate::types::PeerKind; use crate::Topic; - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; + use crate::{Gossipsub, MessageAuthenticity}; use libp2p_core::UpgradeInfo; use libp2p_swarm::{ConnectionHandler, NetworkBehaviour}; - use crate::types::PeerKind; + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; #[test] fn create_thing() { @@ -985,8 +992,8 @@ mod test { assert_eq!(builder.protocol_id(), "purple"); assert_eq!(builder.custom_id_version(), &None); - let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder) - .expect("Correct configuration"); + let mut gossipsub: Gossipsub = + Gossipsub::new(MessageAuthenticity::Anonymous, builder).expect("Correct configuration"); let handler = gossipsub.new_handler(); let (protocol_config, _) = handler.listen_protocol().into_upgrade(); @@ -999,7 +1006,6 @@ mod test { assert_eq!(protocol_ids[1].protocol_id, b"/purple/1.0.0".to_vec()); assert_eq!(protocol_ids[1].kind, PeerKind::Gossipsub); - } #[test] @@ -1014,8 +1020,8 @@ mod test { assert_eq!(builder.protocol_id(), "purple"); assert_eq!(builder.custom_id_version(), &Some(GossipsubVersion::V1_0)); - let mut gossipsub: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, builder) - .expect("Correct configuration"); + let mut gossipsub: Gossipsub = + Gossipsub::new(MessageAuthenticity::Anonymous, builder).expect("Correct configuration"); let handler = gossipsub.new_handler(); let (protocol_config, _) = handler.listen_protocol().into_upgrade(); diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 77fa35f0a96..9ea2e6ea49f 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -162,15 +162,9 @@ enum OutboundSubstreamState { impl GossipsubHandler { /// Builds a new [`GossipsubHandler`]. - pub fn new( - protocol_config: ProtocolConfig, - idle_timeout: Duration, - ) -> Self { + pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self { GossipsubHandler { - listen_protocol: SubstreamProtocol::new( - protocol_config, - (), - ), + listen_protocol: SubstreamProtocol::new(protocol_config, ()), inbound_substream: None, outbound_substream: None, outbound_substream_establishing: false, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index c0ebe72a031..ce337c96455 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -68,8 +68,8 @@ impl ProtocolConfig { let protocol_ids = match custom_id_peer_kind { Some(v) => match v { GossipsubVersion::V1_0 => vec![ProtocolId::new(id, PeerKind::Gossipsub, false)], - GossipsubVersion::V1_1 => vec![ProtocolId::new(id, PeerKind::Gossipsubv1_1, false)] - } + GossipsubVersion::V1_1 => vec![ProtocolId::new(id, PeerKind::Gossipsubv1_1, false)], + }, None => { let mut protocol_ids = vec![ ProtocolId::new(id.clone(), PeerKind::Gossipsubv1_1, true),