Skip to content

Commit

Permalink
feat: add SubscriptionSink::pipe_from_try_stream to support streams…
Browse files Browse the repository at this point in the history
… that returns `Result` (#720)

* refactor: make `pipe_from_stream` take stream of result

The rationale for this is that it is more flexible for use cases when `Stream<Item = Result<T, Error>>`.

Take for example `tokio_stream::Broadcast` then one would have to something like:

```rust
   let stream = BroadcastStream::new(rx).take_while(|r| future::ready(r.is_ok())).filter_map(|r| future::ready(r.ok()));
```

Of course it's a bit awkward to return `Result` when the underlying stream can't fail but I think that's fair trade-off
here.

* Update core/src/server/rpc_module.rs

Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com>

* pipe_from_stream: make E: Display instead of StdError

* add a test

* add `pipe_from_try_stream` API to support `TryStream`

* Update tests/tests/integration_tests.rs

* Update proc-macros/src/lib.rs

Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com>
  • Loading branch information
niklasad1 and TarikGul committed Apr 1, 2022
1 parent 14b1b4b commit 545ceaf
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl SubscriptionClosed {

/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub enum SubscriptionClosedReason {
/// The subscription was closed by calling the unsubscribe method.
Unsubscribed,
Expand Down
57 changes: 47 additions & 10 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::traits::{IdProvider, ToRpcParams};
use futures_channel::{mpsc, oneshot};
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{ErrorCode, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
Expand Down Expand Up @@ -769,36 +769,41 @@ impl SubscriptionSink {

/// Consumes the `SubscriptionSink` and reads data from the `stream` and sends back data on the subscription
/// when items gets produced by the stream.
/// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information.
///
/// Returns `Ok(())` if the stream or connection was terminated.
/// Returns `Err(_)` if one of the items couldn't be serialized.
/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]);
/// tokio::spawn(sink.pipe_from_stream(stream));
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// tokio::spawn(sink.pipe_from_try_stream(stream));
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(mut self, mut stream: S) -> Result<(), Error>
pub async fn pipe_from_try_stream<S, T, E>(mut self, mut stream: S) -> Result<(), Error>
where
S: Stream<Item = T> + Unpin,
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
E: std::fmt::Display,
{
if let Some(close_notify) = self.close_notify.clone() {
let mut stream_item = stream.next();
let mut stream_item = stream.try_next();
let closed_fut = close_notify.notified();
pin_mut!(closed_fut);
loop {
match futures_util::future::select(stream_item, closed_fut).await {
// The app sent us a value to send back to the subscribers
Either::Left((Some(result), next_closed_fut)) => {
Either::Left((Ok(Some(result)), next_closed_fut)) => {
match self.send(&result) {
Ok(_) => (),
Err(Error::SubscriptionClosed(close_reason)) => {
Expand All @@ -809,11 +814,16 @@ impl SubscriptionSink {
break Err(err);
}
};
stream_item = stream.next();
stream_item = stream.try_next();
closed_fut = next_closed_fut;
}
Either::Left((Err(e), _)) => {
let close_reason = SubscriptionClosedReason::Server(e.to_string()).into();
self.close(&close_reason);
break Err(Error::SubscriptionClosed(close_reason));
}
// Stream terminated.
Either::Left((None, _)) => break Ok(()),
Either::Left((Ok(None), _)) => break Ok(()),
// The subscriber went away without telling us.
Either::Right(((), _)) => {
self.close(&SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset));
Expand All @@ -827,6 +837,33 @@ impl SubscriptionSink {
}
}

/// Similar to [`SubscriptionSink::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
///
/// Warning: it's possible to pass in a stream that returns `Result` if `Result: Serialize` is satisfied
/// but it won't cancel the stream when an error occurs. If you want the stream to be canceled when an
/// error occurs use [`SubscriptionSink::pipe_from_try_stream`] instead.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1, 2, 3]);
/// tokio::spawn(sink.pipe_from_stream(stream));
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(self, stream: S) -> Result<(), Error>
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
}

/// Returns whether this channel is closed without needing a context.
pub fn is_closed(&self) -> bool {
self.inner.is_closed() || self.close_notify.is_none()
Expand Down
38 changes: 38 additions & 0 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,44 @@ async fn ws_server_cancels_stream_after_reset_conn() {
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");
}

#[tokio::test]
async fn ws_server_cancels_sub_stream_after_err() {
use jsonrpsee::{ws_server::WsServerBuilder, RpcModule};

let err: &'static str = "error on the stream";
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let server_url = format!("ws://{}", server.local_addr().unwrap());

let mut module = RpcModule::new(());

module
.register_subscription(
"subscribe_with_err_on_stream",
"n",
"unsubscribe_with_err_on_stream",
move |_, sink, _| {
// create stream that produce an error which will cancel the subscription.
let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]);
tokio::spawn(async move {
let _ = sink.pipe_from_try_stream(stream).await;
});
Ok(())
},
)
.unwrap();

server.start(module).unwrap();

let client = WsClientBuilder::default().build(&server_url).await.unwrap();
let mut sub: Subscription<usize> =
client.subscribe("subscribe_with_err_on_stream", None, "unsubscribe_with_err_on_stream").await.unwrap();

assert_eq!(sub.next().await.unwrap().unwrap(), 1);
let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string()));
// The server closed down the subscription with the underlying error from the stream.
assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp));
}

#[tokio::test]
async fn ws_server_subscribe_with_stream() {
use futures::StreamExt;
Expand Down

0 comments on commit 545ceaf

Please sign in to comment.