Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dropping NegotiatedSubstream should close it #2459

Closed
rkuhn opened this issue Jan 30, 2022 · 10 comments
Closed

dropping NegotiatedSubstream should close it #2459

rkuhn opened this issue Jan 30, 2022 · 10 comments

Comments

@rkuhn
Copy link
Contributor

rkuhn commented Jan 30, 2022

I’m currently implementing a streaming response protocol for high message volume (where OneShot incurs undue overhead) and found this behaviour while testing:

2022-01-30T10:43:44.464072Z TRACE libp2p_mplex::io: 9c777bdd6c1aa596: Sending Data { stream_id: LocalStreamId { num: 0, role: Listener }, data: b"\xff\x03" }    
2022-01-30T10:43:44.464140Z TRACE libp2p_mplex::io: 9c777bdd6c1aa596: Flushed substream (0/receiver)    
2022-01-30T10:43:44.464190Z TRACE libp2p_mplex::io: 9c777bdd6c1aa596: Pending reset for stream (0/receiver)    
2022-01-30T10:43:44.464194Z TRACE libp2p_mplex::io: 8b9306d027275288: Received Data { stream_id: RemoteStreamId { num: 0, role: Listener }, data: b"\xff\x03" }    
2022-01-30T10:43:44.464220Z  WARN libp2p_streaming_response::v2::handler: error in substream task: message too large sent: 114    
2022-01-30T10:43:44.464236Z TRACE libp2p_mplex::io: 8b9306d027275288: Buffering b"\xff\x03" for stream (0/initiator) (total: 1)    
2022-01-30T10:43:44.464279Z TRACE libp2p_streaming_response::v2::protocol: read 2 header bytes    
^C

If I add io.close().await? in my handler after sending those two bytes, then the receiver sees the stream close and everything works. Reading the comment for Multiplexed::drop_stream it seems that there should be a reset frame, but that is not triggered — perhaps that “next opportunity” only occurs if there is other activity polling the swarm?

@mxinden
Copy link
Member

mxinden commented Feb 1, 2022

Are you closing (/dropping) the stream only, or the connection as well?


As an aside, why are you using libp2p-mplex over libp2p-yamux? Unless one needs compatibility with the js-libp2p stack (doesn't have a Yamux implementation), I don't see a reason to give up flow control (i.e. not use libp2p-yamux).

@rkuhn
Copy link
Contributor Author

rkuhn commented Feb 2, 2022

I used mplex by virtue of copying one of the test cases from libp2p. My protocol’s output value is the NegotiatedSubstream which the ProtocolsHandler then uses to send and receive data. It is this value I’m dropping — not sure whether you call that stream or connection.

@thomaseizinger
Copy link
Contributor

Is this still an issue? The StreamMuxer implementation have undergone a fair bit of changes recently.

I am considering working on #508 next which should give us clarity over this behaviour.

@thomaseizinger
Copy link
Contributor

I don't think we can ever do this. From looking at existing implementations, it seems that dropping a stream will send a reset frame. This allows a remote to distinguish between a gracefully closed stream and an abruptly closed one.

If we don't want this behaviour, then I don't see any other application of the RESET frames given the interface that AsyncWrite offers us.

@MarcoPolo
Copy link
Contributor

@thomaseizinger I believe the original comment said that the reset frame isn't sent at all:

Reading the comment for Multiplexed::drop_stream it seems that there should be a reset frame, but that is not triggered — perhaps that “next opportunity” only occurs if there is other activity polling the swarm?

@thomaseizinger
Copy link
Contributor

@thomaseizinger I believe the original comment said that the reset frame isn't sent at all:

Reading the comment for Multiplexed::drop_stream it seems that there should be a reset frame, but that is not triggered — perhaps that “next opportunity” only occurs if there is other activity polling the swarm?

I think this will only happen if the StreamMuxer isn't polled at all anymore. With the redesign, a StreamMuxer now has a poll function that needs to be called for the muxer to make progress.

That being said, mplex doesn't use that at the moment because it was designed to make progress internally as soon as any other poll function is called.

I believe the reset frame would be sent on the next call to any of the poll functions.

@thomaseizinger
Copy link
Contributor

@mxinden Is it worth adopting the mplex implementation to use StreamMuxer::poll? I looked into it and it would require another breaking change to the StreamMuxer interface because we need to be able to return Poll::Ready(Ok(None)) because the only available function I can call on mplex is poll_flush which can just succeed with Ok(()).

@mxinden
Copy link
Member

mxinden commented Oct 4, 2022

Is it worth adopting the mplex implementation to use StreamMuxer::poll?

Given that we have yamux, which in my eyes is better than mplex in every regard (libp2p/specs#402), I don't think it makes sense to invest in mplex.

looked into it and it would require another breaking change to the StreamMuxer interface because we need to be able to return Poll::Ready(Ok(None)) because the only available function I can call on mplex is poll_flush which can just succeed with Ok(()).

When Mplex::poll_flush returns Ok(()) in, one could poll it once more until it returns Poll::Pending and then return Poll::Pending from StreamMuxer::poll, right? Thus one would not need to change the signature of StreamMuxer::poll.

@thomaseizinger
Copy link
Contributor

Is it worth adopting the mplex implementation to use StreamMuxer::poll?

Given that we have yamux, which in my eyes is better than mplex in every regard (libp2p/specs#402), I don't think it makes sense to invest in mplex.

looked into it and it would require another breaking change to the StreamMuxer interface because we need to be able to return Poll::Ready(Ok(None)) because the only available function I can call on mplex is poll_flush which can just succeed with Ok(()).

When Mplex::poll_flush returns Ok(()) in, one could poll it once more until it returns Poll::Pending and then return Poll::Pending from StreamMuxer::poll, right? Thus one would not need to change the signature of StreamMuxer::poll.

Unfortunately not. I explored that path and it will lead to endless loop because flush returns Ok(()) when it doesn't have anything to do. We could perhaps modify that for it to return Poll::Pending and register a waker that is called when new data is written to the buffer.

Not sure if it is worth the effort.

@thomaseizinger
Copy link
Contributor

We reset streams on drop to signal unexpected EOF.

@thomaseizinger thomaseizinger closed this as not planned Won't fix, can't repro, duplicate, stale Mar 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants