Skip to content

Commit

Permalink
fix(client): close subscription when server sent SubscriptionClosed
Browse files Browse the repository at this point in the history
… notification (#721)

* fix(client): close subscription when server sent `SubscriptionClosed` notification

* Update core/src/client/async_client/helpers.rs
  • Loading branch information
niklasad1 committed Apr 1, 2022
1 parent 545ceaf commit f55ab3e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
12 changes: 11 additions & 1 deletion core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::time::Duration;

use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT};
use crate::error::SubscriptionClosed;
use crate::Error;

use futures_channel::{mpsc, oneshot};
Expand Down Expand Up @@ -84,7 +85,16 @@ pub(crate) fn process_subscription_response(
};

match manager.as_subscription_mut(&request_id) {
Some(send_back_sink) => match send_back_sink.try_send(response.params.result) {
Some(send_back_sink) => match send_back_sink.try_send(response.params.result.clone()) {
// The server sent a subscription closed notification, then close down the subscription.
Ok(()) if serde_json::from_value::<SubscriptionClosed>(response.params.result).is_ok() => {
if manager.remove_subscription(request_id, sub_id.clone()).is_some() {
Ok(())
} else {
tracing::error!("The server tried to close down an invalid subscription: {:?}", sub_id);
Err(None)
}
}
Ok(()) => Ok(()),
Err(err) => {
tracing::error!("Dropping subscription {:?} error: {:?}", sub_id, err);
Expand Down
1 change: 1 addition & 0 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ async fn ws_server_cancels_sub_stream_after_err() {
let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string()));
// The server closed down the subscription with the underlying error from the stream.
assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp));
assert!(sub.next().await.is_none());
}

#[tokio::test]
Expand Down

0 comments on commit f55ab3e

Please sign in to comment.