Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SubscriptionSink::pipe_from_try_stream to support streams that returns Result #720

Merged
merged 8 commits into from
Apr 1, 2022
2 changes: 1 addition & 1 deletion core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl SubscriptionClosed {

/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub enum SubscriptionClosedReason {
/// The subscription was closed by calling the unsubscribe method.
Unsubscribed,
Expand Down
19 changes: 14 additions & 5 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,25 +764,29 @@ impl SubscriptionSink {
/// when items gets produced by the stream.
///
/// Returns `Ok(())` if the stream or connection was terminated.
/// Returns `Err(_)` if one of the items couldn't be serialized.
/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]);
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32)]` to the subscriber
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// // because after the `Err(_)` the stream is terminated.
/// tokio::spawn(sink.pipe_from_stream(stream));
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(mut self, mut stream: S) -> Result<(), Error>
pub async fn pipe_from_stream<S, T, E>(mut self, mut stream: S) -> Result<(), Error>
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
where
S: Stream<Item = T> + Unpin,
S: Stream<Item = Result<T, E>> + Unpin,
T: Serialize,
E: std::fmt::Display,
{
if let Some(close_notify) = self.close_notify.clone() {
let mut stream_item = stream.next();
Expand All @@ -791,7 +795,7 @@ impl SubscriptionSink {
loop {
match futures_util::future::select(stream_item, closed_fut).await {
// The app sent us a value to send back to the subscribers
Either::Left((Some(result), next_closed_fut)) => {
Either::Left((Some(Ok(result)), next_closed_fut)) => {
match self.send(&result) {
Ok(_) => (),
Err(Error::SubscriptionClosed(close_reason)) => {
Expand All @@ -805,6 +809,11 @@ impl SubscriptionSink {
stream_item = stream.next();
closed_fut = next_closed_fut;
}
Either::Left((Some(Err(e)), _)) => {
let close_reason = SubscriptionClosedReason::Server(e.to_string()).into();
self.close(&close_reason);
break Err(Error::SubscriptionClosed(close_reason));
}
// Stream terminated.
Either::Left((None, _)) => break Ok(()),
// The subscriber went away without telling us.
Expand Down
2 changes: 1 addition & 1 deletion proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub(crate) mod visitor;
/// // as subscription responses.
/// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
/// tokio::spawn(async move {
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// let stream = futures_util::stream::iter([Ok::<_, &str>("one"), Ok("two"), Ok("three")]);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// sink.pipe_from_stream(stream).await;
/// });
///
Expand Down
43 changes: 41 additions & 2 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn ws_server_cancels_stream_after_reset_conn() {
module
.register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| {
// create stream that doesn't produce items.
let stream = futures::stream::empty::<usize>();
let stream = futures::stream::empty::<usize>().map(|i| Ok::<_, Error>(i));
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
tokio::spawn(async move {
sink.pipe_from_stream(stream).await.unwrap();
let send_back = Arc::make_mut(&mut tx);
Expand All @@ -423,6 +423,44 @@ async fn ws_server_cancels_stream_after_reset_conn() {
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");
}

#[tokio::test]
async fn ws_server_cancels_sub_stream_after_err() {
use jsonrpsee::{ws_server::WsServerBuilder, RpcModule};

let err: &'static str = "error on the stream";
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let server_url = format!("ws://{}", server.local_addr().unwrap());

let mut module = RpcModule::new(());

module
.register_subscription(
"subscribe_with_err_on_stream",
"n",
"unsubscribe_with_err_on_stream",
move |_, sink, _| {
// create stream that produce an error which will cancel the subscription.
let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]);
tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
},
)
.unwrap();

server.start(module).unwrap();

let client = WsClientBuilder::default().build(&server_url).await.unwrap();
let mut sub: Subscription<usize> =
client.subscribe("subscribe_with_err_on_stream", None, "unsubscribe_with_err_on_stream").await.unwrap();

assert_eq!(sub.next().await.unwrap().unwrap(), 1);
let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string()));
// The server closed down the subscription with the underlying error from the stream.
assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp));
Copy link
Collaborator

@jsdw jsdw Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth just calling sub.next().await once more after this to confirm that it's None and that the thing definitely won't send any more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, just found a bug let's tackle it another PR.

}

#[tokio::test]
async fn ws_server_subscribe_with_stream() {
use futures::StreamExt;
Expand All @@ -437,7 +475,8 @@ async fn ws_server_subscribe_with_stream() {
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, sink, _| {
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
let stream =
IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| Ok::<_, Error>(c));

sink.pipe_from_stream(stream).await.unwrap();
});
Expand Down