Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SubscriptionSink::pipe_from_try_stream to support streams that returns Result #720

Merged
merged 8 commits into from
Apr 1, 2022
15 changes: 10 additions & 5 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,25 +764,27 @@ impl SubscriptionSink {
/// when items gets produced by the stream.
///
/// Returns `Ok(())` if the stream or connection was terminated.
/// Returns `Err(_)` if one of the items couldn't be serialized.
/// Returns `Err(_)` if the underlying stream return an error or if an item from the stream could not be serialized.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]);
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Ok(3), Err(Box::new(anyhow!("error on the stream"))]);
/// tokio::spawn(sink.pipe_from_stream(stream));
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(mut self, mut stream: S) -> Result<(), Error>
pub async fn pipe_from_stream<S, T, E>(mut self, mut stream: S) -> Result<(), Error>
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
where
S: Stream<Item = T> + Unpin,
S: Stream<Item = Result<T, E>> + Unpin,
T: Serialize,
E: std::error::Error,
{
if let Some(close_notify) = self.close_notify.clone() {
let mut stream_item = stream.next();
Expand All @@ -791,7 +793,7 @@ impl SubscriptionSink {
loop {
match futures_util::future::select(stream_item, closed_fut).await {
// The app sent us a value to send back to the subscribers
Either::Left((Some(result), next_closed_fut)) => {
Either::Left((Some(Ok(result)), next_closed_fut)) => {
match self.send(&result) {
Ok(_) => (),
Err(Error::SubscriptionClosed(close_reason)) => {
Expand All @@ -805,6 +807,9 @@ impl SubscriptionSink {
stream_item = stream.next();
closed_fut = next_closed_fut;
}
Either::Left((Some(Err(e)), _)) => {
break Err(Error::Custom(e.to_string()));
}
// Stream terminated.
Either::Left((None, _)) => break Ok(()),
// The subscriber went away without telling us.
Expand Down
5 changes: 3 additions & 2 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn ws_server_cancels_stream_after_reset_conn() {
module
.register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| {
// create stream that doesn't produce items.
let stream = futures::stream::empty::<usize>();
let stream = futures::stream::empty::<usize>().map(|i| Ok::<_, Error>(i));
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
tokio::spawn(async move {
sink.pipe_from_stream(stream).await.unwrap();
let send_back = Arc::make_mut(&mut tx);
Expand Down Expand Up @@ -437,7 +437,8 @@ async fn ws_server_subscribe_with_stream() {
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, sink, _| {
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
let stream =
IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| Ok::<_, Error>(c));

sink.pipe_from_stream(stream).await.unwrap();
});
Expand Down