Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-design the StreamMuxer trait #2722

Closed
23 of 24 tasks
thomaseizinger opened this issue Jun 22, 2022 · 16 comments
Closed
23 of 24 tasks

Re-design the StreamMuxer trait #2722

thomaseizinger opened this issue Jun 22, 2022 · 16 comments

Comments

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Jun 22, 2022

This issue tracks the redesign of the StreamMuxer trait which was originally started in #2648.

The primary goals are:

  • Emitting substreams which implement AsyncRead + AsyncWrite
  • Use &mut self for the StreamMuxer (This may extend to Pin<&mut Self>)

PRs related to this issue

@elenaf9
Copy link
Contributor

elenaf9 commented Jul 31, 2022

I'd like to discuss again the interface of the StreamMuxer trait, in particular the changes done in #2724.
I am sorry for raising this now and not in the PR itself, but I only now started working on adapting the QuicMuxer (#2289) to that interface change.

In the QUIC muxer we need a way of driving the inner quinn_proto::Connection. Until now, this was done in the StreamMuxer::poll_event loop (code), but that method was removed in #2724. We now have the following options:

  1. Move this logic into StreamMuxer::poll_inbound, which more or less allow us to do the same as poll_event before. But imo this would be a hack/ intuitive, or at least the name would then not be that fitting anymore.
  2. In the QuicTransport we drive the quinn_proto::Endpoint in a separate taskin the background. We could drive the Connections in this same task and use channels or a Mutex to allow the QuixMuxer to interact with the Connection. In the former case (using channels) the Substream implementation would then be rather complicated. The latter case (using Mutex) is imo also not ideal because it is easy to loose the overview of what entity accesses it when. Also, some of the events returned when polling the Connection are relevant for the QuicMuxer.
  3. Change the StreamMuxer interface to follow the same design as Swarm or Transport: One poll method and synchronous methods open_outbound and close. Similar to how it was initially proposed in core/muxing: Redesign StreamMuxer trait #2648.

I am in favor of the 3rd option. The StreamMuxer would then look roughly like this:

pub trait StreamMuxer {
    type Substream: AsyncRead + AsyncWrite; 
    type Error: std::error::Error;
.
    fn close(&mut self) -> Result<Self::Substream, Self::Error>;

    fn open_outbound(&mut self, open_info: Self::Info) -> Result<SubstreamId, Self::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<SubstreamEvent<Self::Substream>, Self::Error>>;
}

pub enum SubstreamEvent<S> {
   Substream {
       stream: S,
       id: SubstreamId
   },
   AddressChange {..}
}

I am aware that there was already a discussion in #2724 (comment) about this and that it was the continuous decision for the current design (e.g. to also allow backpressuring inbound substreams). Again, sorry for not engaging in that discussion back then.
I do like the new API in theory, but after working on the QuicMuxer I think in practice it is rather inconvenient for the implementer.
Therefore I'd like to reopen the discussion. The following points are the reason why I am in favor of having one poll method:

  1. As above mentioned, we need an event loop in the Quic muxer, which has to be driven somewhere. Doing this in poll_inbound feels like a hack, and would then also mean that we can not just stop polling poll_inbound to enforce backpressure.
  2. I only briefly looked into the webRTC muxer implementation (feat: Add WebRTC transport #2622), but maybe this would also allow them to get rid of their async methods and instead drive futures in the poll function? @melekes do you have an opinion on what design would work best for the WebRTC muxer implementation?
  3. Having only one poll method would, as @thomaseizinger already wrote in the original PR core/muxing: Redesign StreamMuxer trait #2648, follow the style "mutate state and return result via poll", which I personally think is a good design to enforce throughout rust-libp2p
  4. StreamMuxer::poll_outbound is imo not that intuitive: We create a future for a new outbound substream and poll that future. If already one exists we just poll the existing one. Implementations (e.g. Yamux or WebRTC) have to solve this by using an Option, but in general this logic is rather implicit. Just looking at the API and not at other existing Muxers,, I would not know how to implement it.

@thomaseizinger @mxinden what do you think? Are there drawbacks that I did not consider or any other reasons why you prefer to current design? Also happy to hear alternative ideas on how / where to drive our inner connection in Quic.
@kpp Since you worked on the QuicMuxer before, maybe you could also give some input on what design you think would work best?

@thomaseizinger
Copy link
Contributor Author

Thanks for commenting!

I did think about this problem actually but perhaps the trade-offs play out differently then what I thought they would. The problem I see with a single poll function is that it creates an API that is actually rather inconvenient to use, esp. when we take into account the requirement of wanting to enforce back-pressure on incoming streams.

The current API is IMO rather simple because it is flat. It also simplified things quite a bit in swarm::Connection.

Assuming we want to keep the current API, then I think the way to implement this for QUIC is by copying what was done in mplex: Drive the inner connection on every poll implementation and buffer events (to a certain limit) that are not the ones you "need" in the current poll function.

If we want a different API, then I'd suggest the following:

pub trait StreamMuxer {
    type Substream: AsyncRead + AsyncWrite; 
    type Error: std::error::Error;
.
    fn close(&mut self) -> Result<(), Self::Error>;

    fn open_outbound(&mut self) -> Result<SubstreamId, Self::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<SubstreamEvent<Self::Substream>, Self::Error>>;
}

pub enum SubstreamEvent<S> {
   OutgoingSubstream {
       stream: S,
       id: SubstreamId
   },
   IncomingSubstream(S),
   AddressChange {..}
}
  • We need a way to differentiate incoming and outgoing substreams
  • OpenInfo IMO does not belong in the muxer
  • Incoming streams don't have an ID

What is not to underestimate is the boilerplate that comes with this ID management which has to be duplicated in every muxer implementation.

@melekes
Copy link
Contributor

melekes commented Aug 1, 2022

I only briefly looked into the webRTC muxer implementation (#2622), but maybe this would also allow them to get rid of their async methods and instead drive futures in the poll function? @melekes do you have an opinion on what design would work best for the WebRTC muxer implementation?

I'm quite happy with the current design of StreamMuxer trait.

@kpp
Copy link
Contributor

kpp commented Aug 1, 2022

I believe I should try to move QuicMuxer to a new API to give it a try.

@thomaseizinger
Copy link
Contributor Author

thomaseizinger commented Aug 3, 2022

When discussing the integration of the new interface in quic, @elenaf9 mentioned that one theoretical issue that we currently have is that one of the poll functions MUST be polled in order for any of the substreams to make progress.

In practise, poll_address_change will always be polled unconditionally, even when we start to enforce back-pressure on poll_inbound and no outbound streams currently need to be opened. This is very subtle though which is why I am proposing the following:

  • Rename poll_address_change to poll_event
  • Introduce a StreamMuxerEvent that for now only has one variant AddressChange

This function can act as a general-purpose "make progress" function and all other poll functions are only called on-demand. Thoughts? @elenaf9 @mxinden @kpp @melekes

@elenaf9
Copy link
Contributor

elenaf9 commented Aug 3, 2022

This is very subtle though which is why I am proposing the following:

  • Rename poll_address_change to poll_event
  • Introduce a StreamMuxerEvent that for now only has one variant AddressChange

This function can act as a general-purpose "make progress" function and all other poll functions are only called on-demand.

Sounds good to me.


For the record:
In an out-of-band discussion with @thomaseizinger we decided to for now stick with the current interface. Main reason is that this interface allows to backpressure inbound substreams via poll_inbound, and because we found a way to adapt the QuicMuxer to it. More or less what @thomaseizinger described above by driving the inner connection on every poll implementation (and we do not even need to buffer any events). Edit: See elenaf9#6.

@kpp

I believe I should try to move QuicMuxer to a new API to give it a try.

See kpp#19 (comment).

@kpp
Copy link
Contributor

kpp commented Aug 3, 2022

Thank you. I will take a look

@thomaseizinger
Copy link
Contributor Author

When discussing the integration of the new interface in quic, @elenaf9 mentioned that one theoretical issue that we currently have is that one of the poll functions MUST be polled in order for any of the substreams to make progress.

In practise, poll_address_change will always be polled unconditionally, even when we start to enforce back-pressure on poll_inbound and no outbound streams currently need to be opened. This is very subtle though which is why I am proposing the following:

* Rename `poll_address_change` to `poll_event`

* Introduce a `StreamMuxerEvent` that for now only has one variant `AddressChange`

This function can act as a general-purpose "make progress" function and all other poll functions are only called on-demand. Thoughts? @elenaf9 @mxinden @kpp @melekes

There is now a draft PR of the above idea here: #2797

@elenaf9
Copy link
Contributor

elenaf9 commented Aug 19, 2022

Something I stumbled across just now: in the swarm, we never poll StreamMuxer::poll_close when closing a connection. Instead we simply drop the future returned from StreamMuxerExt::close, and then the connection (including the muxer) itself:

Err(error) => {
command_receiver.close();
let (handler, _closing_muxer) = connection.close();
// Terminate the task with the error, dropping the connection.
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
handler,
})
.await;
return;

(The above match block is always called when the connection closes, including graceful closing e.g. because of KeepAlive::No).

Before #2248 the returned future was driven to completion before dropping the connection (code).
@mxinden since you were the author of that PR: was this an intentional change?
If so, I don't think having (an async) StreamMuxer::poll_close makes any sense.

@mxinden
Copy link
Member

mxinden commented Aug 20, 2022

I think there is a confusion here. We close a connection either due to (1) the command_receiver returning a Command::Close or (2) due to the connection returning an error.

(1) command_receiver returning a Command::Close

In this case we drive the closing_muxer returned by Connection::close to completion.

See old code and new code.

(2) connection returning an error

In this case we don't drive the closing_muxer to completion. The connection returned an error, thus there is need to gracefully close the connection.

See old code and new code.

@elenaf9 does the above make sense?

@elenaf9
Copy link
Contributor

elenaf9 commented Aug 20, 2022

Ah thanks, I did overlook case Nr (1).
Though I do wonder:

(2) connection returning an error

In this case we don't drive the closing_muxer to completion. The connection returned an error, thus there is need to gracefully close the connection.

The connection also returns an error Error::KeepAlive if all network behaviours reported that they don't need the connection anymore (code). I would say 99% percent of the time, that's the reason why we close a connection in rust-libp2p. I would argue that in that case it still makes sense to gracefully close the connection.

@thomaseizinger
Copy link
Contributor Author

We could at least try to close it and ignore any io::Error returned from poll_close. We may have to be careful though as this may trigger panics in the form of "polled after completion", depending on the muxer implementation.

@mxinden
Copy link
Member

mxinden commented Aug 20, 2022

Example for such "polled after completion": #2598

That said, I think you are right @elenaf9. In the case of the handler, not the actual I/O resource (connection), returning an error (e.g. Error::KeepAlive) we should try to close the connection. Good catch. I have not thought about that.

I wonder whether we should fix this earlier, or first combine HandlerWrapper and Connection and then fix this issue.

@elenaf9
Copy link
Contributor

elenaf9 commented Aug 20, 2022

I wonder whether we should fix this earlier, or first combine HandlerWrapper and Connection and then fix this issue.

Imo it's sufficient if we do it after / within the PR that combines HanlderWrapper and Connection. From #2796 I assume that this is planned for the near future.

@thomaseizinger
Copy link
Contributor Author

I am closing this as completed 🥳

@mxinden
Copy link
Member

mxinden commented Sep 1, 2022

Very good work! The amount of small pull requests working towards a greater goal is stunning. Thanks for making it easy to review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants