Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/quic: Adapt QuicMuxer to upstream StreamMuxer changes #6

Closed
wants to merge 97 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
7c8a977
swarm/src/handler: Document responsibility limiting inbound streams (…
mxinden Jul 14, 2022
d4f8ec2
misc/metrics: Track # connected nodes supporting specific protocol (…
mxinden Jul 15, 2022
1a553db
core/muxing: Flatten `StreamMuxer` interface to `poll_{inbound,outbou…
thomaseizinger Jul 18, 2022
e95232c
build(deps): Bump Swatinem/rust-cache from 1.4.0 to 2.0.0 (#2759)
dependabot[bot] Jul 19, 2022
66c2319
transports/tcp: Bump to v0.35.0 (#2760)
mxinden Jul 19, 2022
c8066df
*: Update to `if-watch` `1.1.1` (#2754)
tgmichel Jul 19, 2022
163c5c1
README.md: Add crates.io and docs.rs badges (#2766)
LesnyRumcajs Jul 21, 2022
51a8471
build(deps): Update prometheus-client requirement from 0.16.0 to 0.17…
dependabot[bot] Jul 22, 2022
f15a3dc
core/muxing: Drop `Unpin` requirement from `SubstreamBox` (#2762)
thomaseizinger Jul 22, 2022
2e2c117
core/tests: Remove unnecessary `Arc` (#2763)
thomaseizinger Jul 22, 2022
95713ab
core: fix PR number in changelog entry (#2769)
elenaf9 Jul 23, 2022
f85a990
core/tests: Remove unnecessary util module (#2764)
thomaseizinger Jul 25, 2022
c19a211
misc/metrics: fix clippy::assign-op-pattern (#2773)
elenaf9 Jul 25, 2022
0ec3bbc
core/muxing: Remove `Unpin` requirement from `StreamMuxer::Substream`…
thomaseizinger Jul 25, 2022
74f01e4
transports/tcp: fix clippy::from-over-into (#2774)
elenaf9 Jul 25, 2022
ce963df
core: fix clippy::op-ref, clippy::needless-borrow (#2770)
elenaf9 Jul 25, 2022
56c492c
core/muxing: Drop `Sync` requirement for `StreamMuxer` on `StreamMuxe…
thomaseizinger Jul 27, 2022
09c6908
protocols/dcutr: Fix clippy lints (#2772)
elenaf9 Jul 28, 2022
eaf3f3a
.cargo: Check all features in custom-clippy (#2771)
elenaf9 Jul 28, 2022
7019d49
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/mult…
elenaf9 Jul 31, 2022
2b9e212
examples/README.md: Fix tutorial link (#2790)
lukehinds Aug 2, 2022
028dece
core/muxing: Have functions on `StreamMuxer` take `Pin<&mut Self>` (#…
thomaseizinger Aug 3, 2022
07c0dba
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Aug 3, 2022
57840a3
transports/quic: adapt QuicMuxer to libp2p#2724
elenaf9 Aug 3, 2022
579b1be
swarm-derive/: Generate OutEvent if not provided (#2792)
mxinden Aug 8, 2022
e2b83b7
SECURITY.md: Document supported releases and security mail addr (#2800)
mxinden Aug 8, 2022
3da8b42
README: Point to security@libp2p.io (#2799)
mxinden Aug 8, 2022
1012579
protocols/: Remove passing default variant to `WithPeerId::condition`…
K0UR05H Aug 10, 2022
a4110a2
*: Remove `inject_connected` / `inject_disconnected` from docs (#2805)
K0UR05H Aug 10, 2022
0a01c81
misc/multistream-select: Replace msg.get(0) with msg.first() (#2816)
mxinden Aug 13, 2022
3ce0ef9
transports/quic: apply suggestions from review
elenaf9 Aug 13, 2022
3060d12
transports/quic: rename QuicMuxerInner -> Inner
elenaf9 Aug 13, 2022
63c6edc
transports/quic: improve poll_{inbound, outbound}
elenaf9 Aug 13, 2022
06aaea6
*: Fix `clippy::derive-partial-eq-without-eq` (#2818)
elenaf9 Aug 14, 2022
cef5056
core/muxing: Generalise `StreamMuxer::poll_address_change` to `poll` …
thomaseizinger Aug 16, 2022
0e5a25d
examples/file-sharing: Support binary files (#2786)
qidu Aug 16, 2022
878c49f
swarm/src/behaviour: Deprecate NetworkBehaviourEventProcess (#2784)
mxinden Aug 16, 2022
6a9fa3d
build(deps): Update prost requirement from 0.10 to 0.11 (#2788)
dependabot[bot] Aug 16, 2022
8dc0188
swarm/src/connection: Test max_negotiating_inbound_streams (#2785)
mxinden Aug 16, 2022
67266c6
swarm-derive/: Add where clause of behaviour to generated out event (…
mxinden Aug 17, 2022
d2c5053
build(deps): Update prometheus-client requirement from 0.17.0 to 0.18…
dependabot[bot] Aug 17, 2022
a2738fd
swarm-derive/: Derive Debug for generated OutEvent (#2821)
mxinden Aug 17, 2022
475289c
docs/coding-guidelines: Add document (#2780)
mxinden Aug 17, 2022
8931860
core/identity: Allow clippy::large-enum-variant on `Keypair` (#2827)
elenaf9 Aug 19, 2022
1aeaba3
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Aug 19, 2022
95fc6da
transports/quic: drive connection in `QuicMuxer::poll`
elenaf9 Aug 19, 2022
3d3666e
*: Enforce no clippy warnings for examples (#2826)
thomaseizinger Aug 20, 2022
217dd2c
clippy.toml: Create config and disallow unbounded channels (#2823)
mxinden Aug 20, 2022
0d7c8a5
transports/quic: refactor `Connection::poll_event`
elenaf9 Aug 21, 2022
67b52aa
transports/quic: rm `Connection::is_handshaking`
elenaf9 Aug 21, 2022
66974fc
transports/quic: refactor connection closing
elenaf9 Aug 22, 2022
4253080
*: Prepare v0.47.0 (#2830)
mxinden Aug 22, 2022
c88efe8
transports/quic: rm mutex around to_endpoint tx
elenaf9 Aug 22, 2022
0a82be4
transports/quic/tests: drive peers concurrently
elenaf9 Aug 22, 2022
d610e4b
protocols/dcutr: Disable `libp2p-core` default features (#2836)
elenaf9 Aug 23, 2022
d92cab8
build(deps): Update p256 requirement from 0.10.0 to 0.11.0 (#2636)
dependabot[bot] Aug 23, 2022
ca07ce4
swarm/behaviour: Remove deprecated NetworkBehaviourEventProcess (#2840)
mxinden Aug 26, 2022
a3dec47
docs/coding-guidelines: Document limit on number of tasks (#2839)
mxinden Aug 26, 2022
247b553
swarm-derive/: Remove support for custom poll method (#2841)
mxinden Aug 28, 2022
6855ab9
swarm-derive/: Remove support for ignoring fields on struct (#2842)
mxinden Aug 29, 2022
e01f77b
transports/noise: Migrate away from deprecated `sodiumoxide` for test…
pinkforest Aug 30, 2022
f16561c
.github/workflows: Split advisory issues from PR workflows using `car…
pinkforest Aug 30, 2022
36a2773
*: Update changelogs for prost dep update (#2851)
divagant-martian Aug 30, 2022
89f898c
protocols/mdns: Allow users to choose between async-io and tokio runt…
gallegogt Sep 2, 2022
cee199a
protocols/kad: Support multiple protocol names (#2846)
dmitry-markin Sep 3, 2022
f04df29
.git-blame-ignore-revs/: Initialize and add rustfmt commit (#2864)
thomaseizinger Sep 4, 2022
b8c3b28
protocols/gossipsub: Allow publishing to anything that implements `In…
GamePad64 Sep 5, 2022
a40180c
.github/: Introduce interop tests (#2835)
laurentsenta Sep 7, 2022
8644c65
core/: Introduce `rsa` feature flag to avoid `ring` dependency (#2860)
GamePad64 Sep 7, 2022
2eca38c
core/upgrade/: Add `ReadyUpgrade` (#2855)
thomaseizinger Sep 7, 2022
d2eddf4
muxers/yamux: Remove `OpenSubstreamToken` (#2873)
thomaseizinger Sep 7, 2022
83c6795
*: Prepare v0.48.0 (#2869)
mxinden Sep 7, 2022
c650dc1
*: Replace _serde with dep:serde in Cargo.toml (#2868)
GamePad64 Sep 8, 2022
69caf98
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Sep 9, 2022
fe3e09b
transports/quic: upgrade to if-watch v2.0.0
elenaf9 Sep 9, 2022
b6924db
transports/quic: fix clippy
elenaf9 Sep 9, 2022
689460f
transports/quic: fix smoke test
elenaf9 Sep 9, 2022
457fb51
transports/tcp: Simplify IfWatcher integration (#2813)
elenaf9 Sep 10, 2022
41d39fb
transports/quic: add `Endpoint::try_send`
elenaf9 Sep 10, 2022
66c2755
swarm/: Fix rare test failure of `multiple_addresses_err` (#2882)
thomaseizinger Sep 11, 2022
72bade1
build(deps): Update env_logger to 0.9 and criterion to 0.4 (#2896)
kpp Sep 14, 2022
5906140
protocols/kad: Remove deprecated `set_protocol_name()` (#2866)
dmitry-markin Sep 15, 2022
2c739e9
protocols/noise: Introduce `NoiseAuthenticated::xx` constructor with …
thomaseizinger Sep 16, 2022
c81b06a
*: Fix various clippy warnings (#2900)
umgefahren Sep 16, 2022
2025de3
swarm-derive/: Allow for templated behaviours (#2907)
thomaseizinger Sep 16, 2022
4c617a0
subscribe
elenaf9 Sep 17, 2022
4e027b1
transports/quic: handle substream being dropped
elenaf9 Sep 19, 2022
bdba780
transports/quic: return err on read after reset
elenaf9 Sep 19, 2022
40cb4f3
transports/quic: apply comments from code review
elenaf9 Sep 19, 2022
f8d1430
transports/quic: better naming, fix docs
elenaf9 Sep 20, 2022
4c3229b
transports/quic: add doc for `Endpoint:try_send`
elenaf9 Sep 20, 2022
e393fe5
transports/quic: add `ip_to_listenaddr`
elenaf9 Sep 20, 2022
d28db18
transports/quic: disable connection migration
elenaf9 Sep 20, 2022
42db0ed
transports/quic: minor fix
elenaf9 Sep 20, 2022
d46b72e
transports/quic: minor fixes
elenaf9 Sep 20, 2022
ec3c74a
transports/quic: rework forwarding of new connections
elenaf9 Sep 20, 2022
b7103aa
transports/quic: fix broken intra-doc link
elenaf9 Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
303 changes: 142 additions & 161 deletions transports/quic/src/muxer.rs
Expand Up @@ -22,7 +22,7 @@ use crate::connection::{Connection, ConnectionEvent};
use crate::error::Error;

use futures::{AsyncRead, AsyncWrite};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::muxing::StreamMuxer;
use parking_lot::Mutex;
use std::{
collections::{HashMap, VecDeque},
Expand Down Expand Up @@ -54,6 +54,60 @@ struct QuicMuxerInner {
poll_event_waker: Option<Waker>,
}

impl QuicMuxerInner {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
fn poll_connection(&mut self, cx: &mut Context<'_>) {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
while let Poll::Ready(event) = self.connection.poll_event(cx) {
match event {
ConnectionEvent::Connected => {
tracing::error!("Unexpected Connected event on established QUIC connection");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error seems a bit much here. I wouldn't won't be woken at 3am because my production app is reporting _error_s and then seeing it is this one which is practically harmless :)

Why is this unexpected in the first place?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectionEvent::Connected should only be returned a single time, which is when we finished all the crypto and established a connection.
In quic::upgrade::Update we poll a pending new connection until it returns ConnectionEvent::Connected. Only then we create the QuicMuxer for this connection. Hence within QuicMuxer the event should not happen again.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it really should not happen, then perhaps put a debug assert here?

}
ConnectionEvent::ConnectionLost(_) => {
if let Some(waker) = self.poll_close_waker.take() {
waker.wake();
}
self.connection.close();
}

ConnectionEvent::StreamOpened => {
if let Some(waker) = self.pending_substreams.pop_front() {
waker.wake();
}
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
ConnectionEvent::StreamReadable(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
if let Some(waker) = substream.read_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamWritable(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
if let Some(waker) = substream.write_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamFinished(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
substream.finished = true;
if let Some(waker) = substream.finished_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamStopped(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
substream.stopped = true;
}
}
ConnectionEvent::StreamAvailable => {
// Handled below.
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}

/// State of a single substream.
#[derive(Default, Clone)]
struct SubstreamState {
Expand Down Expand Up @@ -89,6 +143,93 @@ impl QuicMuxer {
}
}
}
impl StreamMuxer for QuicMuxer {
type Substream = Substream;
type Error = Error;

fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<libp2p_core::Multiaddr, Self::Error>> {
self.inner.lock().poll_connection(cx);
// TODO
Poll::Pending
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
inner.poll_connection(cx);
if let Some(substream_id) = inner.connection.pop_incoming_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(substream))
} else {
inner.poll_event_waker = Some(cx.waker().clone());
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
Poll::Pending
}
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}

fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
inner.poll_connection(cx);
if let Some(substream_id) = inner.connection.pop_outgoing_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(substream))
} else {
inner.pending_substreams.push_back(cx.waker().clone());
Poll::Pending
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();
inner.poll_connection(cx);

if inner.connection.connection.is_drained() {
return Poll::Ready(Ok(()));
}

if inner.substreams.is_empty() {
let connection = &mut inner.connection;
if !connection.connection.is_closed() {
connection.close();
if let Some(waker) = inner.poll_event_waker.take() {
waker.wake();
}
} else {
}
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
while let Poll::Ready(event) = inner.connection.poll_event(cx) {
if let ConnectionEvent::ConnectionLost(_) = event {
return Poll::Ready(Ok(()));
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume this returns Poll::Pending at some point. Then we will break out of this loop and register a waker further down. Once we are woken, poll_close gets called again and we go straight into inner.poll_connection which will likely yield the ConnectionEvent::ConnectionLost. I don't think we will be observing it here then, right?

I think we may not want to call poll_connection at the top here or maybe return an error from poll_connection in case it has been closed?

} else {
for substream in inner.substreams.clone().keys() {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
if let Err(e) = inner.connection.shutdown_substream(*substream) {
tracing::error!("substream finish error on muxer close: {}", e);
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Register `cx.waker()` as being woken up if the connection closes.
inner.poll_close_waker = Some(cx.waker().clone());

Poll::Pending

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we upholding the poll-contract in case the list of substreams is not empty?

}
}

impl fmt::Debug for QuicMuxer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("QuicMuxer").finish()
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
}

pub struct Substream {
id: quinn_proto::StreamId,
Expand Down Expand Up @@ -270,163 +411,3 @@ impl AsyncWrite for Substream {
}
}
}

impl StreamMuxer for QuicMuxer {
type OutboundSubstream = ();
type Substream = Substream;
type Error = Error;

/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
///
/// If `Pending` is returned, then the current task will be notified once the muxer
/// is ready to be polled, similar to the API of `Stream::poll()`.
/// Only the latest task that was used to call this method may be notified.
///
/// It is permissible and common to use this method to perform background
/// work, such as processing incoming packets and polling timers.
///
/// An error can be generated if the connection has been closed.
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
// We use `poll_event` to perform the background processing of the entire connection.
let mut inner = self.inner.lock();

while let Poll::Ready(event) = inner.connection.poll_event(cx) {
match event {
ConnectionEvent::Connected => {
tracing::error!("Unexpected Connected event on established QUIC connection");
}
ConnectionEvent::ConnectionLost(_) => {
if let Some(waker) = inner.poll_close_waker.take() {
waker.wake();
}
inner.connection.close();
}

ConnectionEvent::StreamOpened => {
if let Some(waker) = inner.pending_substreams.pop_front() {
waker.wake();
}
}
ConnectionEvent::StreamReadable(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
if let Some(waker) = substream.read_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamWritable(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
if let Some(waker) = substream.write_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamFinished(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
substream.finished = true;
if let Some(waker) = substream.finished_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamStopped(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
substream.stopped = true;
}
}
ConnectionEvent::StreamAvailable => {
// Handled below.
}
}
}

if let Some(substream_id) = inner.connection.pop_incoming_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream)))
} else {
inner.poll_event_waker = Some(cx.waker().clone());
Poll::Pending
}
}

/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
///
/// We provide the same handler to poll it by multiple tasks, which is done as a FIFO
/// queue via `poll_outbound`.
fn open_outbound(&self) -> Self::OutboundSubstream {}

/// Polls the outbound substream.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be polled, similar to the API of `Future::poll()`.
fn poll_outbound(
&self,
cx: &mut Context<'_>,
_: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
if let Some(substream_id) = inner.connection.pop_outgoing_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(substream))
} else {
inner.pending_substreams.push_back(cx.waker().clone());
Poll::Pending
}
}

/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
fn destroy_outbound(&self, _: Self::OutboundSubstream) {
// Do nothing because we don't know which waker should be destroyed.
// TODO `Self::OutboundSubstream` -> autoincrement id.
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();

if inner.connection.connection.is_drained() {
return Poll::Ready(Ok(()));
}

if inner.substreams.is_empty() {
let connection = &mut inner.connection;
if !connection.connection.is_closed() {
connection.close();
if let Some(waker) = inner.poll_event_waker.take() {
waker.wake();
}
} else {
}
while let Poll::Ready(event) = inner.connection.poll_event(cx) {
if let ConnectionEvent::ConnectionLost(_) = event {
return Poll::Ready(Ok(()));
}
}
} else {
for substream in inner.substreams.clone().keys() {
if let Err(e) = inner.connection.shutdown_substream(*substream) {
tracing::error!("substream finish error on muxer close: {}", e);
}
}
}

// Register `cx.waker()` as being woken up if the connection closes.
inner.poll_close_waker = Some(cx.waker().clone());

Poll::Pending
}
}

impl fmt::Debug for QuicMuxer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("QuicMuxer").finish()
}
}