Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

muxers/yamux: Mitigation of unnecessary stream drops #3071

Merged
merged 4 commits into from
Nov 15, 2022

Conversation

thomaseizinger
Copy link
Contributor

@thomaseizinger thomaseizinger commented Oct 30, 2022

Description

A busy network connection can easily fill the internal buffer. We utilise wake_by_ref so we can return Poll::Pending without breaking the poll-function contract. This gives callers a chance to call poll_inbound and empty the buffer.

Fixes #3041.

Notes

cc @rkuhn

Links to any relevant issues

Open Questions

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

Copy link
Contributor

@rkuhn rkuhn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from the nitpick it looks good to me, thanks!

@@ -123,6 +140,8 @@ where
return Poll::Ready(Ok(stream));
}

self.inbound_stream_waker = Some(cx.waker().clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll contract says that a waker needs to be registered in case Poll::Pending is returned. While perhaps this unconditional registration may be legal, I think it still violates expectations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This registration is not unconditional. It is conditional on the list being empty! The list no longer being empty is IMO a valid reason to wake the task that last polled here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, are you saying that self.poll_inner will return Poll::Pending? That would be fine, then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH: even if the inner stream does return Poll::Pending, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?

In general, the Rust async rules are thus: if you create a Poll::Pending, then you’re responsible for waking a waker. Otherwise you just pass along poll results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, are you saying that self.poll_inner will return Poll::Pending? That would be fine, then.

Not quite.

This poll function essentially should we polled again in two different scenarios:

  1. If the general poll-function pushed a new stream to the buffer.
  2. If the socket has more bytes to read (that is poll_inner)

Registering this waker here isn't strictly necessary because we poll the StreamMuxer in swarm::Connection in a loop anyway. But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.

OTOH: even if the inner stream does return Poll::Pending, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?

Like I said, it probably works without too because we will always be implicitly woken again, even if the task that calls the general poll consumes everything from the inner socket already and pushed them to the buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.

I agree with this. I don't think the inner working of impl StreamMuxer for Yamux should make assumptions on how it is called in Connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making things fail due to a missed wake-up would require polling this task through Arc<Mutex<...>>, which I hope nobody would ever consider doing, so I agree that this is harmless.

This discussion is yet another spotlight on how difficult it is to correctly reason about poll functions. This is exacerbated within libp2p by extending the concept — like is being done here — in non-trivial ways. If there actually was some surrounding task that did care about being woken for two different reasons, it would have to manufacture its own wakers (like FuturesUnordered does). But even that usage would be broken because the Waker destined for poll_inbound may be passed to poll_inner as well, overwriting a previously registered Waker that was destined for that purpose.

While debugging wake-up loops is painful, it is orders of magnitude easier than debugging missed wake-ups. May I suggest that we adhere to the policy that each async object offers exactly one poll function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not have poll semantics, because that can lead to confusing behaviour and bugs.

The following pattern is what I suggest:

if let Poll::Ready(x) = connection.poll(cx) {
    ...
    return x;
}
for sub in connection.inbound_streams.drain(..) {
    ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest that we adhere to the policy that each async object offers exactly one poll function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not have poll semantics, because that can lead to confusing behaviour and bugs.

The design of this trait was inspired by the AsyncWrite design which also has multiple poll_ functions that users need to drive. Sink is similar.

One problem with a single "poll" function design is that it puts more burden on implementations. For example, opening a new stream is not instantaneous, it may require negotiation of new credits with the other party. As such, a "new_outbound" function can only give you an ID or some other kind of handle for a new stream. This means every implementation needs to implement some kind of "stream ID" management. In contrast to that a poll_new_outbound function can just return Pending until the new stream is ready to be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never found Sink usable for anything I had to do, so its design carries little weight in my opinion.

Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on. The example of a “new_outbound” function boils down to the choice of polling the machinery until the new connection is ready, ignoring everything else that happens in the meantime. This already requires the machinery to aggregate its output and let the poller inspect it, for which there is no reason a priori to offer a poll-shaped API. In particular, no Context is necessary to ask whether events have been emitted, which removes one prolific source of confusion inherent to Rust’s Task design.

So my solution to the new_outbound problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my solution to the new_outbound problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.

This is basically the poll_ready& start_send API of Sink then, yes?

Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on.

poll_outbound is not the only issue. The problem of having to buffer streams is mostly because yamux doesn't allow us to backpressure the number of streams. The QUIC muxer on the other hand allows us to make progress on the connection itself without necessarily accepting new inbound streams. I am happy to change the API for something better but so far I've not found a solution where the caller (swarm::Connection) can explicitly signal to the muxer that it is now able to take more inbound streams.

We could move away from poll_inbound by having just a pop_inbound function. This however then requires more documentation on when the caller should call this function again if it ever returns None. At that stage, we are just re-inventing the wheel when we could also be using Poll and automatically wake the task when we know that there are new inbound streams available.

this.inbound_stream_buffer.push_back(inbound_stream);

if let Some(waker) = this.inbound_stream_waker.take() {
waker.wake()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discussion above makes me think: is this muxer polled from multiple tasks? If it is, then poll_inner will probably switch out wakers all the time, making it non-deterministic which caller will eventually be woken. If not, then this extra wakeup makes little sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in rust-libp2p production code, it is only polled from a single task but it is a public interface so there may be other consumers, #2952 for example.

@mxinden
Copy link
Member

mxinden commented Nov 4, 2022

Thanks @thomaseizinger for providing the hotfix. I will take a deeper look.

In the meantime @rkuhn can you confirm that this improves or resolves the issue you reported in #3041?

@mergify

This comment was marked as resolved.

Copy link
Contributor

@rkuhn rkuhn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long delay! I have now tested this branch (patched back to 0.49 in top-level Cargo.toml for drop-in replacement) with the ipfs-embed test suite and it works nicely! I don’t need to increase max_buffered_inbound_streams, increasing max_negotiating_inbound_streams suffices, also when hammering bitswap with 1800 Wants at once.

@thomaseizinger
Copy link
Contributor Author

Sorry for the long delay! I have now tested this branch (patched back to 0.49 in top-level Cargo.toml for drop-in replacement) with the ipfs-embed test suite and it works nicely! I don’t need to increase max_buffered_inbound_streams, increasing max_negotiating_inbound_streams suffices, also when hammering bitswap with 1800 Wants at once.

That is great to hear!

I am inclined to remove the configuration knob again then. Less config surface is better IMO.

@thomaseizinger thomaseizinger changed the title muxers/yamux: Attempt mitigation of unnecessary stream drops muxers/yamux: Mitigation of unnecessary stream drops Nov 8, 2022
@mergify

This comment was marked as resolved.

@thomaseizinger
Copy link
Contributor Author

@mxinden I've rebased this PR onto the 0.49.0 release. To actually make the patch release, you only need to checkout this branch and go one commit back (git checkout HEAD^).

Let me know if you need any other changes. I'd suggest we do the release first and then merge this PR!

@thomaseizinger
Copy link
Contributor Author

thomaseizinger commented Nov 11, 2022

@kpp @melekes @divagant-martian Would you mind testing this PR on one of your networks? Thank you 🙏

@divagant-martian
Copy link
Contributor

divagant-martian commented Nov 11, 2022

will do, sounds good if we run over the weekend and report back on monday (Tue in Australia)?

@thomaseizinger
Copy link
Contributor Author

will do, sounds good if we run over the weekend and report back on monday?

Yeah that is totally fine!

@mxinden
Copy link
Member

mxinden commented Nov 14, 2022

@mxinden I've rebased this PR onto the 0.49.0 release. To actually make the patch release, you only need to checkout this branch and go one commit back (git checkout HEAD^).

I would prefer patch releases to go through our pull request workflow as well. I created branch v0.49 based on tag v0.49.0. Can you either redirect this pull request or a new pull request against that branch?

Once that is merged I will cut the patch release. We can then either merge branch v0.49 back into master or cherry-pick the patch into master.

Copy link
Member

@mxinden mxinden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, this fix looks good to me. Though see comment above before we can merge here.

@divagant-martian
Copy link
Contributor

We didn't see any notable difference, for better or worse, between v0.49.0 and this PR. Hope this helps 🤷

@thomaseizinger
Copy link
Contributor Author

@mxinden I've rebased this PR onto the 0.49.0 release. To actually make the patch release, you only need to checkout this branch and go one commit back (git checkout HEAD^).

I would prefer patch releases to go through our pull request workflow as well. I created branch v0.49 based on tag v0.49.0. Can you either redirect this pull request or a new pull request against that branch?

Can do but I also would like to understand what the benefit is? Are you trying to partly use GitFlow here?

Renaming my branch to v0.49 will be the exact same result so I am not sure what we are achieving here.

Once that is merged I will cut the patch release. We can then either merge branch v0.49 back into master or cherry-pick the patch into master.

There will be merge conflicts, which I already resolved in this PR as part of the last patch.

@thomaseizinger
Copy link
Contributor Author

See #3121.

@mxinden
Copy link
Member

mxinden commented Nov 15, 2022

@mxinden I've rebased this PR onto the 0.49.0 release. To actually make the patch release, you only need to checkout this branch and go one commit back (git checkout HEAD^).

I would prefer patch releases to go through our pull request workflow as well. I created branch v0.49 based on tag v0.49.0. Can you either redirect this pull request or a new pull request against that branch?

Can do but I also would like to understand what the benefit is? Are you trying to partly use GitFlow here?

Renaming my branch to v0.49 will be the exact same result so I am not sure what we are achieving here.

I think I misunderstood your comment in #3071 (comment). No benefit from my proposed workflow.

Once that is merged I will cut the patch release. We can then either merge branch v0.49 back into master or cherry-pick the patch into master.

There will be merge conflicts, which I already resolved in this PR as part of the last patch.

Got it. Thanks. I will merge #3121 and cut a release. Then we can merge this pull request into master.

@mergify mergify bot merged commit 9dadf5c into master Nov 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Swarm does not honour max_negotiating_inbound_streams setting
4 participants