Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ws server): close all subscription when the connection is closed #725

Merged

Conversation

niklasad1
Copy link
Member

No description provided.

@niklasad1 niklasad1 requested a review from a team as a code owner April 5, 2022 12:26
let mut module = RpcModule::new(tx);

module
.register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| {
// create stream that doesn't produce items.
let stream = futures::stream::empty::<usize>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was flaky I don't know what I was thinking.

Copy link
Member Author

@niklasad1 niklasad1 Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further, this could complete for notify_one successfully if the receiver from the connection is closed once the a new item is produced on the stream and is tried to be sent to the subscriber.

so really tricky test to but I added sleep for one hour so should be "okeyisch" really tricky to test this.

// when the connection closes to be on safe side.
close_notify_server_stop.notify_one();
// Notify all listeners and close down associated tasks.
close_notify_server_stop.notify_waiters();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the fix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay makes sense. So listeners were being launched from pipe_from_try_stream, and in this case because it was notify_one only a single task was stopped.

Good catch 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, basically every connection can have n number of subscriptions and we want to cancel them all when the connection is killed.

the subscriptions will/may be canceled as soon some message is sent via them but that might take a while so we want to do it as soon as the connection is closed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, basically every connection can have n number of subscriptions and we want to cancel them all when the connection is killed.

When I wrote this code I went back and forth a bunch on this and in the end I landed on "connections have a single subscription", but that seems a bit dumb now, not sure what I was thinking. :/ Good catch!

assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");

let rx_len = rx.take(10).fold(0, |acc, _| async move { acc + 1 }).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that running drop(client); right after the subscribing might mean that we don't get all 10 items back from it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could a few milliseconds to send the WS close message but so not really is my understanding rx here is another channel where tx is encapsulated in the RpcModule so it's not the subscription stream

Then after connection is closed the actual subscriptions should be terminated using Notify::notify_waiters after that this messages and sent to channel above...

Copy link
Collaborator

@jsdw jsdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from what I understand (and well done for finding this!!)! I assume we need notify_waiters because there is more than one thing that might be waiting on that to resolve in order to properly clean up (and it def seems like the "safe" option regardless)

Copy link
Member

@TarikGul TarikGul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, good catch on the notifications.

In terms of the integration tests, I see how it's difficult to test. Will keep this one in the back on my mind.

@niklasad1 niklasad1 merged commit b687c07 into master Apr 5, 2022
@niklasad1 niklasad1 deleted the na-ws-server-close-all-subscriptions-when-conn-reset branch April 5, 2022 16:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants