Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SubscriptionSink::pipe_from_try_stream to support streams that returns Result #720

Merged
merged 8 commits into from
Apr 1, 2022
2 changes: 1 addition & 1 deletion core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl SubscriptionClosed {

/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub enum SubscriptionClosedReason {
/// The subscription was closed by calling the unsubscribe method.
Unsubscribed,
Expand Down
57 changes: 47 additions & 10 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::traits::{IdProvider, ToRpcParams};
use futures_channel::{mpsc, oneshot};
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{ErrorCode, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse,
Expand Down Expand Up @@ -762,36 +762,41 @@ impl SubscriptionSink {

/// Consumes the `SubscriptionSink` and reads data from the `stream` and sends back data on the subscription
/// when items gets produced by the stream.
/// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information.
///
/// Returns `Ok(())` if the stream or connection was terminated.
/// Returns `Err(_)` if one of the items couldn't be serialized.
/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]);
/// tokio::spawn(sink.pipe_from_stream(stream));
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// tokio::spawn(sink.pipe_from_try_stream(stream));
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(mut self, mut stream: S) -> Result<(), Error>
pub async fn pipe_from_try_stream<S, T, E>(mut self, mut stream: S) -> Result<(), Error>
where
S: Stream<Item = T> + Unpin,
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
E: std::fmt::Display,
{
if let Some(close_notify) = self.close_notify.clone() {
let mut stream_item = stream.next();
let mut stream_item = stream.try_next();
let closed_fut = close_notify.notified();
pin_mut!(closed_fut);
loop {
match futures_util::future::select(stream_item, closed_fut).await {
// The app sent us a value to send back to the subscribers
Either::Left((Some(result), next_closed_fut)) => {
Either::Left((Ok(Some(result)), next_closed_fut)) => {
match self.send(&result) {
Ok(_) => (),
Err(Error::SubscriptionClosed(close_reason)) => {
Expand All @@ -802,11 +807,16 @@ impl SubscriptionSink {
break Err(err);
}
};
stream_item = stream.next();
stream_item = stream.try_next();
closed_fut = next_closed_fut;
}
Either::Left((Err(e), _)) => {
let close_reason = SubscriptionClosedReason::Server(e.to_string()).into();
self.close(&close_reason);
break Err(Error::SubscriptionClosed(close_reason));
}
// Stream terminated.
Either::Left((None, _)) => break Ok(()),
Either::Left((Ok(None), _)) => break Ok(()),
// The subscriber went away without telling us.
Either::Right(((), _)) => {
self.close(&SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset));
Expand All @@ -820,6 +830,33 @@ impl SubscriptionSink {
}
}

/// Similar to [`SubscriptionSink::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
///
/// Warning: it's possible to pass in a stream that returns `Result` if `Result: Serialize` is satisfied
/// but it won't cancel the stream when an error occurs. If you want the stream to be canceled when an
/// error occurs use [`SubscriptionSink::pipe_from_try_stream`] instead.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1, 2, 3]);
/// tokio::spawn(sink.pipe_from_stream(stream));
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(self, stream: S) -> Result<(), Error>
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
}

/// Returns whether this channel is closed without needing a context.
pub fn is_closed(&self) -> bool {
self.inner.is_closed() || self.close_notify.is_none()
Expand Down
2 changes: 1 addition & 1 deletion proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub(crate) mod visitor;
/// // as subscription responses.
/// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
/// tokio::spawn(async move {
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// let stream = futures_util::stream::iter([Ok::<_, &str>("one"), Ok("two"), Ok("three")]);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// sink.pipe_from_stream(stream).await;
/// });
///
Expand Down
39 changes: 39 additions & 0 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,45 @@ async fn ws_server_cancels_stream_after_reset_conn() {
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");
}

#[tokio::test]
async fn ws_server_cancels_sub_stream_after_err() {
use jsonrpsee::{ws_server::WsServerBuilder, RpcModule};

let err: &'static str = "error on the stream";
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let server_url = format!("ws://{}", server.local_addr().unwrap());

let mut module = RpcModule::new(());

module
.register_subscription(
"subscribe_with_err_on_stream",
"n",
"unsubscribe_with_err_on_stream",
move |_, sink, _| {
// create stream that produce an error which will cancel the subscription.
let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]);
tokio::spawn(async move {
let _ = sink.pipe_from_try_stream(stream).await;
});
Ok(())
},
)
.unwrap();

server.start(module).unwrap();

let client = WsClientBuilder::default().build(&server_url).await.unwrap();
let mut sub: Subscription<usize> =
client.subscribe("subscribe_with_err_on_stream", None, "unsubscribe_with_err_on_stream").await.unwrap();

assert_eq!(sub.next().await.unwrap().unwrap(), 1);
let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string()));
// The server closed down the subscription with the underlying error from the stream.
assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp));
Copy link
Collaborator

@jsdw jsdw Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth just calling sub.next().await once more after this to confirm that it's None and that the thing definitely won't send any more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, just found a bug let's tackle it another PR.

sub.next().await.unwrap();
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test]
async fn ws_server_subscribe_with_stream() {
use futures::StreamExt;
Expand Down