From 30fc8820373ddbaa44568cdd780bc82e8fdc4f76 Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Mon, 17 Jan 2022 12:08:34 -0500 Subject: [PATCH] protocols/gossipsub: Implement unsub backoff spec changes (#2403) Implements the changes specified by https://github.com/libp2p/specs/pull/383. Co-authored-by: Max Inden --- protocols/gossipsub/CHANGELOG.md | 4 ++ protocols/gossipsub/src/behaviour.rs | 26 ++++++-- protocols/gossipsub/src/behaviour/tests.rs | 71 ++++++++++++++++++++++ protocols/gossipsub/src/config.rs | 26 ++++++++ 4 files changed, 122 insertions(+), 5 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 07e9f2ade52..3200b372ba4 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -13,11 +13,15 @@ - Fix `GossipsubConfigBuilder::build()` requiring `&self` to live for `'static` (see [PR 2409]) +- Implement Unsubscribe backoff as per [libp2p specs PR 383] (see [PR 2403]). + [PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346 [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327 [PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408 [PR 2409]: https://github.com/libp2p/rust-libp2p/pull/2409 +[PR 2403]: https://github.com/libp2p/rust-libp2p/pull/2403 +[libp2p specs PR 383]: https://github.com/libp2p/specs/pull/383 # 0.34.0 [2021-11-16] diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index ed803c6d59a..34fe368ef40 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1049,6 +1049,7 @@ where topic_hash: &TopicHash, peer: &PeerId, do_px: bool, + on_unsubscribe: bool, ) -> GossipsubControlAction { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer, topic_hash.clone()); @@ -1088,14 +1089,19 @@ where Vec::new() }; + let backoff = if on_unsubscribe { + self.config.unsubscribe_backoff() + } else { + self.config.prune_backoff() + }; + // update backoff - self.backoffs - .update_backoff(topic_hash, peer, self.config.prune_backoff()); + self.backoffs.update_backoff(topic_hash, peer, backoff); GossipsubControlAction::Prune { topic_hash: topic_hash.clone(), peers, - backoff: Some(self.config.prune_backoff().as_secs()), + backoff: Some(backoff.as_secs()), } } @@ -1111,7 +1117,9 @@ where for peer in peers { // Send a PRUNE control message debug!("LEAVE: Sending PRUNE to peer: {:?}", peer); - let control = self.make_prune(topic_hash, &peer, self.config.do_px()); + let on_unsubscribe = true; + let control = + self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); Self::control_pool_add(&mut self.control_pool, peer, control); // If the peer did not previously exist in any mesh, inform the handler @@ -1487,9 +1495,10 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send + let on_unsubscribe = false; let prune_messages = to_prune_topics .iter() - .map(|t| self.make_prune(t, peer_id, do_px)) + .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) .collect(); // Send the prune messages to the peer debug!( @@ -2598,6 +2607,9 @@ where // NOTE: In this case a peer has been added to a topic mesh, and removed from another. // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. + + // The following prunes are not due to unsubscribing. + let on_unsubscribe = false; if let Some(topics) = to_prune.remove(&peer) { let mut prunes = topics .iter() @@ -2606,6 +2618,7 @@ where topic_hash, &peer, self.config.do_px() && !no_px.contains(&peer), + on_unsubscribe, ) }) .collect::>(); @@ -2630,6 +2643,8 @@ where } // handle the remaining prunes + // The following prunes are not due to unsubscribing. + let on_unsubscribe = false; for (peer, topics) in to_prune.iter() { let mut remaining_prunes = Vec::new(); for topic_hash in topics { @@ -2637,6 +2652,7 @@ where topic_hash, peer, self.config.do_px() && !no_px.contains(peer), + on_unsubscribe, ); remaining_prunes.push(prune); // inform the handler diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 84f4ec94275..1183f83b83c 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -2037,6 +2037,77 @@ mod tests { ); } + #[test] + fn test_unsubscribe_backoff() { + const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100); + let config = GossipsubConfigBuilder::default() + .backoff_slack(1) + // ensure a prune_backoff > unsubscribe_backoff + .prune_backoff(Duration::from_secs(5)) + .unsubscribe_backoff(1) + .heartbeat_interval(HEARTBEAT_INTERVAL) + .build() + .unwrap(); + + let topic = String::from("test"); + // only one peer => mesh too small and will try to regraft as early as possible + let (mut gs, _, topics) = inject_nodes1() + .peer_no(1) + .topics(vec![topic.clone()]) + .to_subscribe(true) + .gs_config(config) + .create_network(); + + let _ = gs.unsubscribe(&Topic::new(topic.clone())); + + assert_eq!( + count_control_msgs(&gs, |_, m| match m { + GossipsubControlAction::Prune { backoff, .. } => backoff == &Some(1), + _ => false, + }), + 1, + "Peer should be pruned with `unsubscribe_backoff`." + ); + + let _ = gs.subscribe(&Topic::new(topics[0].to_string())); + + // forget all events until now + flush_events(&mut gs); + + // call heartbeat + gs.heartbeat(); + + // Sleep for one second and apply 10 regular heartbeats (interval = 100ms). + for _ in 0..10 { + sleep(HEARTBEAT_INTERVAL); + gs.heartbeat(); + } + + // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat + // is needed). + assert_eq!( + count_control_msgs(&gs, |_, m| match m { + GossipsubControlAction::Graft { .. } => true, + _ => false, + }), + 0, + "Graft message created too early within backoff period" + ); + + // Heartbeat one more time this should graft now + sleep(HEARTBEAT_INTERVAL); + gs.heartbeat(); + + // check that graft got created + assert!( + count_control_msgs(&gs, |_, m| match m { + GossipsubControlAction::Graft { .. } => true, + _ => false, + }) > 0, + "No graft message was created after backoff period" + ); + } + #[test] fn test_flood_publish() { let config: GossipsubConfig = GossipsubConfig::default(); diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index c99bef5de3e..e2758cd45da 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -77,6 +77,7 @@ pub struct GossipsubConfig { do_px: bool, prune_peers: usize, prune_backoff: Duration, + unsubscribe_backoff: Duration, backoff_slack: u32, flood_publish: bool, graft_flood_threshold: Duration, @@ -276,6 +277,15 @@ impl GossipsubConfig { self.prune_backoff } + /// Controls the backoff time when unsubscribing from a topic. + /// + /// This is how long to wait before resubscribing to the topic. A short backoff period in case + /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default + /// is 10 seconds. + pub fn unsubscribe_backoff(&self) -> Duration { + self.unsubscribe_backoff + } + /// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This /// solves problems occuring through high latencies. In particular if @@ -421,6 +431,7 @@ impl Default for GossipsubConfigBuilder { do_px: false, prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented. prune_backoff: Duration::from_secs(60), + unsubscribe_backoff: Duration::from_secs(10), backoff_slack: 1, flood_publish: true, graft_flood_threshold: Duration::from_secs(10), @@ -636,6 +647,16 @@ impl GossipsubConfigBuilder { self } + /// Controls the backoff time when unsubscribing from a topic. + /// + /// This is how long to wait before resubscribing to the topic. A short backoff period in case + /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default + /// is 10 seconds. + pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self { + self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff); + self + } + /// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This /// solves problems occuring through high latencies. In particular if @@ -777,6 +798,11 @@ impl GossipsubConfigBuilder { "The following inequality doesn't hold mesh_outbound_min <= self.config.mesh_n / 2", ); } + + if self.config.unsubscribe_backoff.as_millis() == 0 { + return Err("The unsubscribe_backoff parameter should be positive."); + } + Ok(self.config.clone()) } }