Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow custom gossipsub protocol id #2718

Merged
merged 10 commits into from Jul 2, 2022
23 changes: 16 additions & 7 deletions protocols/gossipsub/src/behaviour.rs
Expand Up @@ -3056,13 +3056,22 @@ where
type OutEvent = GossipsubEvent;

fn new_handler(&mut self) -> Self::ConnectionHandler {
GossipsubHandler::new(
self.config.protocol_id_prefix().clone(),
self.config.max_transmit_size(),
self.config.validation_mode().clone(),
self.config.idle_timeout(),
self.config.support_floodsub(),
)
if self.config.support_custom() {
GossipsubHandler::new_custom(
self.config.custom_protocol_id().clone(),
self.config.max_transmit_size(),
self.config.validation_mode().clone(),
self.config.idle_timeout(),
)
} else {
GossipsubHandler::new(
self.config.protocol_id_prefix().clone(),
self.config.max_transmit_size(),
self.config.validation_mode().clone(),
self.config.idle_timeout(),
self.config.support_floodsub(),
)
}
}

fn inject_connection_established(
Expand Down
29 changes: 26 additions & 3 deletions protocols/gossipsub/src/config.rs
Expand Up @@ -53,6 +53,7 @@ pub enum ValidationMode {
#[derive(Clone)]
pub struct GossipsubConfig {
protocol_id_prefix: Cow<'static, str>,
custom_protocol_id: Cow<'static, str>,
history_length: usize,
history_gossip: usize,
mesh_n: usize,
Expand Down Expand Up @@ -90,6 +91,7 @@ pub struct GossipsubConfig {
max_ihave_messages: usize,
iwant_followup_time: Duration,
support_floodsub: bool,
support_custom: bool,
published_message_ids_cache_time: Duration,
}

Expand All @@ -105,6 +107,11 @@ impl GossipsubConfig {
&self.protocol_id_prefix
}

/// The custom protocol id. It is optional and will replace anything that is set on protocol_id_prefix.
pub fn custom_protocol_id(&self) -> &Cow<'static, str> {
&self.custom_protocol_id
}

// Overlay network parameters.
/// Number of heartbeats to keep in the `memcache` (default is 5).
pub fn history_length(&self) -> usize {
Expand Down Expand Up @@ -370,6 +377,11 @@ impl GossipsubConfig {
self.support_floodsub
}

/// Enable support for custom Gossipsub Protocol IDs. Default false.
pub fn support_custom(&self) -> bool {
self.support_custom
}

/// Published message ids time cache duration. The default is 10 seconds.
pub fn published_message_ids_cache_time(&self) -> Duration {
self.published_message_ids_cache_time
Expand All @@ -395,6 +407,7 @@ impl Default for GossipsubConfigBuilder {
GossipsubConfigBuilder {
config: GossipsubConfig {
protocol_id_prefix: Cow::Borrowed("meshsub"),
custom_protocol_id: Cow::Borrowed(""),
history_length: 5,
history_gossip: 3,
mesh_n: 6,
Expand Down Expand Up @@ -444,6 +457,7 @@ impl Default for GossipsubConfigBuilder {
max_ihave_messages: 10,
iwant_followup_time: Duration::from_secs(3),
support_floodsub: false,
support_custom: false,
published_message_ids_cache_time: Duration::from_secs(10),
},
}
Expand All @@ -457,9 +471,16 @@ impl From<GossipsubConfig> for GossipsubConfigBuilder {
}

impl GossipsubConfigBuilder {
/// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`).
pub fn protocol_id_prefix(&mut self, protocol_id: impl Into<Cow<'static, str>>) -> &mut Self {
self.config.protocol_id_prefix = protocol_id.into();
/// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`).
pub fn protocol_id_prefix(&mut self, protocol_id_prefix: impl Into<Cow<'static, str>>) -> &mut Self {
self.config.protocol_id_prefix = protocol_id_prefix.into();
self
}

/// The custom protocol id to negotiate this protocol (overrides any prefix set via `protocol_id_prefix`)
pub fn protocol_id(&mut self, protocol_id: impl Into<Cow<'static, str>>) -> &mut Self {
self.config.support_custom = true;
self.config.custom_protocol_id = protocol_id.into();
self
}

Expand Down Expand Up @@ -811,6 +832,7 @@ impl std::fmt::Debug for GossipsubConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut builder = f.debug_struct("GossipsubConfig");
let _ = builder.field("protocol_id_prefix", &self.protocol_id_prefix);
let _ = builder.field("custom_protocol_id", &self.custom_protocol_id);
let _ = builder.field("history_length", &self.history_length);
let _ = builder.field("history_gossip", &self.history_gossip);
let _ = builder.field("mesh_n", &self.mesh_n);
Expand Down Expand Up @@ -842,6 +864,7 @@ impl std::fmt::Debug for GossipsubConfig {
let _ = builder.field("max_ihave_messages", &self.max_ihave_messages);
let _ = builder.field("iwant_followup_time", &self.iwant_followup_time);
let _ = builder.field("support_floodsub", &self.support_floodsub);
let _ = builder.field("support_custom", &self.support_custom);
let _ = builder.field(
"published_message_ids_cache_time",
&self.published_message_ids_cache_time,
Expand Down
32 changes: 32 additions & 0 deletions protocols/gossipsub/src/handler.rs
Expand Up @@ -195,6 +195,38 @@ impl GossipsubHandler {
in_mesh: false,
}
}

/// Builds a new [`GossipsubHandler`] with a custom Protocol ID
pub fn new_custom(
protocol_id: std::borrow::Cow<'static, str>,
max_transmit_size: usize,
validation_mode: ValidationMode,
idle_timeout: Duration,
) -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(
ProtocolConfig::new_custom(
protocol_id,
max_transmit_size,
validation_mode,
),
(),
),
inbound_substream: None,
outbound_substream: None,
outbound_substream_establishing: false,
outbound_substreams_created: 0,
inbound_substreams_created: 0,
send_queue: SmallVec::new(),
peer_kind: None,
peer_kind_sent: false,
protocol_unsupported: false,
idle_timeout,
upgrade_errors: VecDeque::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
in_mesh: false,
}
}
}

impl ConnectionHandler for GossipsubHandler {
Expand Down
23 changes: 20 additions & 3 deletions protocols/gossipsub/src/protocol.rs
Expand Up @@ -81,6 +81,22 @@ impl ProtocolConfig {
validation_mode,
}
}

pub fn new_custom(
protocol_id: Cow<'static, str>,
max_transmit_size: usize,
validation_mode: ValidationMode
) -> ProtocolConfig {
let protocol_ids = vec![
ProtocolId::new(protocol_id, PeerKind::Custom),
];

ProtocolConfig {
protocol_ids,
max_transmit_size,
validation_mode,
}
}
}

/// The protocol ID
Expand All @@ -94,11 +110,12 @@ pub struct ProtocolId {

/// An RPC protocol ID.
impl ProtocolId {
pub fn new(prefix: Cow<'static, str>, kind: PeerKind) -> Self {
pub fn new(prefix_or_custom: Cow<'static, str>, kind: PeerKind) -> Self {
let protocol_id = match kind {
PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix, "1.1.0"),
PeerKind::Gossipsub => format!("/{}/{}", prefix, "1.0.0"),
PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix_or_custom, "1.1.0"),
PeerKind::Gossipsub => format!("/{}/{}", prefix_or_custom, "1.0.0"),
PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"),
PeerKind::Custom => format!("{}", prefix_or_custom),
// NOTE: This is used for informing the behaviour of unsupported peers. We do not
// advertise this variant.
PeerKind::NotSupported => unreachable!("Should never advertise NotSupported"),
Expand Down
3 changes: 3 additions & 0 deletions protocols/gossipsub/src/types.rs
Expand Up @@ -103,6 +103,8 @@ pub enum PeerKind {
Gossipsub,
/// A floodsub peer.
Floodsub,
/// A custom gossipsub peer.
Custom,
/// The peer doesn't support any of the protocols.
NotSupported,
}
Expand Down Expand Up @@ -372,6 +374,7 @@ impl PeerKind {
pub fn as_static_ref(&self) -> &'static str {
match self {
Self::NotSupported => "Not Supported",
Self::Custom => "Custom",
Self::Floodsub => "Floodsub",
Self::Gossipsub => "Gossipsub v1.0",
Self::Gossipsubv1_1 => "Gossipsub v1.1",
Expand Down