Skip to content

Commit

Permalink
core/connection/listeners: Create event on remove_listener (#2261)
Browse files Browse the repository at this point in the history
Create a `ListenersEvent::Closed` when a listener is removed via
`Swarm::remove_listener`. This makes it more consistent with `Swarm::listen_on`,
and also informs the Swarm about the associated expired addresses.

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
elenaf9 and mxinden committed Oct 11, 2021
1 parent 3eb0344 commit 7718d1d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
4 changes: 4 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
- Add `SignedEnvelope` and `PeerRecord` according to [RFC0002] and [RFC0003]
(see [PR 2107]).

- Report `ListenersEvent::Closed` when dropping a listener in `ListenersStream::remove_listener`,
return `bool` instead of `Result<(), ()>` (see [PR 2261]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2213]: https://github.com/libp2p/rust-libp2p/pull/2213
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
Expand All @@ -44,6 +47,7 @@
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195
[PR 2107]: https://github.com/libp2p/rust-libp2p/pull/2107
[PR 2261]: https://github.com/libp2p/rust-libp2p/pull/2261
[RFC0002]: https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md
[RFC0003]: https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md

Expand Down
73 changes: 63 additions & 10 deletions core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
use futures::{prelude::*, task::Context, task::Poll};
use log::debug;
use smallvec::SmallVec;
use std::{collections::VecDeque, fmt, pin::Pin};
use std::{collections::VecDeque, fmt, mem, pin::Pin};

/// Implementation of `futures::Stream` that allows listening on multiaddresses.
///
Expand Down Expand Up @@ -90,6 +90,8 @@ where
listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
/// The next listener ID to assign.
next_id: ListenerId,
/// Pending listeners events to return from [`ListenersStream::poll`].
pending_events: VecDeque<ListenersEvent<TTrans>>,
}

/// The ID of a single listener.
Expand Down Expand Up @@ -177,6 +179,7 @@ where
transport,
listeners: VecDeque::new(),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}

Expand All @@ -187,6 +190,7 @@ where
transport,
listeners: VecDeque::with_capacity(capacity),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}

Expand All @@ -213,13 +217,24 @@ where

/// Remove the listener matching the given `ListenerId`.
///
/// Return `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
self.listeners.remove(i);
Ok(())
let mut listener = self
.listeners
.remove(i)
.expect("Index can not be out of bounds.");
let listener_project = listener.as_mut().project();
let addresses = mem::take(listener_project.addresses).into_vec();
self.pending_events.push_back(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses,
reason: Ok(()),
});
true
} else {
Err(())
false
}
}

Expand All @@ -235,6 +250,10 @@ where

/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
// Return pending events from closed listeners.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
Expand Down Expand Up @@ -292,18 +311,20 @@ where
});
}
Poll::Ready(None) => {
let addresses = mem::take(listener_project.addresses).into_vec();
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
addresses,
reason: Ok(()),
})
});
}
Poll::Ready(Some(Err(err))) => {
let addresses = mem::take(listener_project.addresses).into_vec();
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
addresses,
reason: Err(err),
})
});
}
}
}
Expand Down Expand Up @@ -537,4 +558,36 @@ mod tests {
}
});
}

#[test]
fn listener_closed() {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();

let mut listeners = ListenersStream::new(mem_transport);
let id = listeners.listen_on("/memory/0".parse().unwrap()).unwrap();

let event = listeners.next().await.unwrap();
let addr;
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
addr = listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}

assert!(listeners.remove_listener(id));

match listeners.next().await.unwrap() {
ListenersEvent::Closed {
listener_id,
addresses,
reason: Ok(()),
} => {
assert_eq!(listener_id, id);
assert!(addresses.contains(&addr));
}
other => panic!("Unexpected listeners event: {:?}", other),
}
});
}
}
5 changes: 3 additions & 2 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ where

/// Remove a previously added listener.
///
/// Returns `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
self.listeners.remove_listener(id)
}

Expand Down
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@
parameters to `NetworkBehaviourAction<Self::OutEvent,
Self::ProtocolsHandler>`. See [PR 2191].

- Return `bool` instead of `Result<(), ()>` for `Swarm::remove_listener`(see
[PR 2261]).

[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150
[PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2192]: https://github.com/libp2p/rust-libp2p/pull/2192
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2261]: https://github.com/libp2p/rust-libp2p/pull/2261

# 0.30.0 [2021-07-12]

Expand Down
5 changes: 3 additions & 2 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ where

/// Remove some listener.
///
/// Returns `Ok(())` if there was a listener with this ID.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
self.network.remove_listener(id)
}

Expand Down

0 comments on commit 7718d1d

Please sign in to comment.