Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/quic: Refactor listener handling (updated) #19

Merged
merged 32 commits into from Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0f40e51
core/muxing: Replace `Into<io::Error>` bound on `StreamMuxer` with `s…
thomaseizinger Jun 24, 2022
2c70c59
protocols/identify: Extend log message on second identify push (#2726)
mxinden Jun 26, 2022
072d736
swarm/CHANGELOG: Merge unreleased section v0.36.2 with v0.37.0 (#2727)
mxinden Jun 26, 2022
40744be
protocols/identify/: Add example (#2689)
dadepo Jun 27, 2022
f814b21
protocols/relay: Don't duplicate p2p protocol in relay reservation (#…
stormshield-pj50 Jun 27, 2022
7190952
transports/tcp/: Call take_error on tokio TcpStream (#2725)
mxinden Jun 27, 2022
9f7d0fe
build(deps): Bump styfle/cancel-workflow-action from 0.9.1 to 0.10.0 …
dependabot[bot] Jun 28, 2022
423adca
protocols/identify: Fix dev deps for example (#2737)
elenaf9 Jun 29, 2022
748588e
protocols/gossipsub: Fix a typo in error message (#2739)
ackintosh Jun 30, 2022
862ae14
protocols/rendezvous/src/client: Fix clippy warning let-unit-value (#…
mxinden Jul 1, 2022
31f1d66
Cargo.toml: Undo version bump to v0.47.0 as v0.46.0 is unreleased (#2…
mxinden Jul 1, 2022
6db5712
protocols/gossipsub/: Allow custom protocol id (#2718)
bernardoaraujor Jul 2, 2022
b28cdb3
protocols/identify: Fix race condition in discover_peer_after_disconn…
mxinden Jul 4, 2022
62622a1
core/src/transport: Poll Transport directly, remove Transport::Listen…
elenaf9 Jul 4, 2022
7df6bae
*: Prepare v0.46.0 (#2730)
mxinden Jul 5, 2022
2f2b7cb
*: Bump swarm-derive version and prepare v0.46.1 (#2747)
kpp Jul 5, 2022
d0da3a0
swarm/: Set default dial concurrency factor to 8 (#2741)
maschad Jul 7, 2022
6b758e3
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/mult…
elenaf9 Jul 10, 2022
5157ea1
transports/quic: adapt to transport trait changes
elenaf9 Jun 27, 2022
57743ef
transports/quic: support multiple listening endpoints
elenaf9 Jul 4, 2022
e5e5b34
transports/quic: re-use endpoints for dialing
elenaf9 Jul 4, 2022
39d855c
transports/quic: test endpoint re-use
elenaf9 Jul 4, 2022
0e797cf
transports/quic: use intra-doc links in docs
elenaf9 Jul 10, 2022
ef7b823
transports/quic: remove mutex in `InAddr`
elenaf9 Jul 10, 2022
a375524
transports/quic: fix clippy
elenaf9 Jul 10, 2022
917bccc
transport/quic: add test for closing a listener
elenaf9 Jul 10, 2022
5fe3e1c
Add an ignored test to dial ipv6 from ipv4
kpp Jul 21, 2022
574d534
Get rid of recursion in poll_if_addr
kpp Jul 21, 2022
9e1cc81
cargo fmt
kpp Jul 22, 2022
2c41f70
Merge pull request #5 from kpp/kpp-multiple-endpoints
elenaf9 Jul 22, 2022
1a4b8ac
quic: match socket family of endpoint for dialing
elenaf9 Jul 23, 2022
bf06d96
quic: fix outdated docs & intra-doc links
elenaf9 Jul 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Expand Up @@ -20,7 +20,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand Down Expand Up @@ -52,7 +52,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand All @@ -109,7 +109,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand All @@ -135,7 +135,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand All @@ -157,7 +157,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand Down
30 changes: 29 additions & 1 deletion CHANGELOG.md
Expand Up @@ -43,7 +43,35 @@

# `libp2p` facade crate

# 0.46.0 [unreleased]
# 0.47.0 [unreleased]

- Update to [`libp2p-dcutr` `v0.5.0`](protocols/dcutr/CHANGELOG.md#050).

- Update to [`libp2p-rendezvous` `v0.8.0`](protocols/rendezvous/CHANGELOG.md#080).

- Update to [`libp2p-ping` `v0.38.0`](protocols/ping/CHANGELOG.md#0380).

- Update to [`libp2p-identify` `v0.38.0`](protocols/identify/CHANGELOG.md#0380).

- Update to [`libp2p-floodsub` `v0.38.0`](protocols/floodsub/CHANGELOG.md#0380).

- Update to [`libp2p-relay` `v0.11.0`](protocols/relay/CHANGELOG.md#0110).

- Update to [`libp2p-metrics` `v0.8.0`](misc/metrics/CHANGELOG.md#080).

- Update to [`libp2p-kad` `v0.39.0`](protocols/kad/CHANGELOG.md#0390).

- Update to [`libp2p-autonat` `v0.6.0`](protocols/autonat/CHANGELOG.md#060).

- Update to [`libp2p-request-response` `v0.20.0`](protocols/request-response/CHANGELOG.md#0200).

- Update to [`libp2p-swarm` `v0.38.0`](swarm/CHANGELOG.md#0380).

# 0.46.1

- Update to `libp2p-derive` [`v0.28.0`](swarm-derive/CHANGELOG.md#0280).

# 0.46.0

- Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646].
- Added weak dependencies for features. See [PR 2646].
Expand Down
29 changes: 15 additions & 14 deletions Cargo.toml
Expand Up @@ -36,6 +36,7 @@ default = [
"websocket",
"yamux",
]

autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand Down Expand Up @@ -78,23 +79,23 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"

libp2p-autonat = { version = "0.5.0", path = "protocols/autonat", optional = true }
libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true }
libp2p-core = { version = "0.34.0", path = "core", default-features = false }
libp2p-dcutr = { version = "0.4.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.37.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.37.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.38.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.7.0", path = "misc/metrics", optional = true }
libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.37.0", path = "protocols/ping", optional = true }
libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.10.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.7.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.19.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.37.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.27.0", path = "swarm-derive" }
libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.38.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" }
libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true }
Expand All @@ -107,13 +108,13 @@ smallvec = "1.6.1"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.38.0", path = "protocols/mdns", optional = true }
libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true }
libp2p-quic = { version = "0.7.0", path = "transports/quic", optional = true }
libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
libp2p-gossipsub = { version = "0.39.0", path = "protocols/gossipsub", optional = true }
libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
9 changes: 8 additions & 1 deletion core/CHANGELOG.md
@@ -1,11 +1,18 @@
# 0.34.0 - unreleased
# 0.34.0

- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].

- Remove the concept of individual `Transport::Listener` streams from `Transport`.
Instead the `Transport` is polled directly via `Transport::poll`. The
`Transport` is now responsible for driving its listeners. See [PR 2652].

[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710
[PR 2652]: https://github.com/libp2p/rust-libp2p/pull/2652

# 0.33.0

Expand Down
19 changes: 0 additions & 19 deletions core/src/connection.rs
Expand Up @@ -43,25 +43,6 @@ impl std::ops::Add<usize> for ConnectionId {
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
122 changes: 53 additions & 69 deletions core/src/either.rs
Expand Up @@ -20,7 +20,7 @@

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
use futures::{
Expand Down Expand Up @@ -203,7 +203,7 @@ where
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = io::Error;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
Expand All @@ -212,11 +212,11 @@ where
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
}
}
Expand All @@ -237,11 +237,11 @@ where
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
.map_err(|e| e.into()),
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
.map_err(|e| e.into()),
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}
Expand All @@ -261,8 +261,8 @@ where

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand All @@ -274,48 +274,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> {
First(#[pin] A),
Second(#[pin] B),
}

impl<AStream, BStream, AInner, BInner, AError, BError> Stream
for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
{
type Item = Result<
ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>,
EitherError<AError, BError>,
>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::First)
.map_err(EitherError::A)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::Second)
.map_err(EitherError::B)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
},
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -385,11 +343,12 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
}
}
}

#[derive(Debug, Copy, Clone)]
#[pin_project(project = EitherTransportProj)]
#[derive(Debug)]
#[must_use = "transports do nothing unless polled"]
pub enum EitherTransport<A, B> {
Left(A),
Right(B),
Left(#[pin] A),
Right(#[pin] B),
}

impl<A, B> Transport for EitherTransport<A, B>
Expand All @@ -399,29 +358,54 @@ where
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::First(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match self.project() {
EitherTransportProj::Left(a) => match a.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::First)
.map_err(EitherError::A),
),
},
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::Second(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
EitherTransportProj::Right(b) => match b.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::Second)
.map_err(EitherError::B),
),
},
}
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
match self {
EitherTransport::Left(t) => t.remove_listener(id),
EitherTransport::Right(t) => t.remove_listener(id),
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::A(err)),
}),
EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::B(err)),
}),
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
Expand Down