Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[examples]: update pubsub examples #705

Merged
merged 10 commits into from
Apr 2, 2022
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.8", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }

[[example]]
name = "http"
Expand Down
28 changes: 20 additions & 8 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::time::Duration;

use futures::StreamExt;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -61,21 +65,29 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
std::thread::sleep(std::time::Duration::from_millis(50));
let item = LETTERS.chars().nth(idx);

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
let item = &LETTERS[one..two];

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
Expand Down
56 changes: 40 additions & 16 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example that shows how to broadcasts the produced values to all active subscriptions using `tokio::sync::broadcast`.

use std::net::SocketAddr;
use std::time::Duration;

use futures::future;
use futures::StreamExt;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;

const NUM_SUBSCRIPTION_RESPONSES: usize = 5;

Expand All @@ -44,33 +50,51 @@ async fn main() -> anyhow::Result<()> {
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let mut subscribe_hello: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let client1 = WsClientBuilder::default().build(&url).await?;
let client2 = WsClientBuilder::default().build(&url).await?;
let sub1: Subscription<i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub2: Subscription<i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

let mut i = 0;
while i <= NUM_SUBSCRIPTION_RESPONSES {
let r = subscribe_hello.next().await;
tracing::info!("received {:?}", r);
i += 1;
}
let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) });
let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) });

future::join(fut1, fut2).await;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
}
std::thread::sleep(std::time::Duration::from_secs(1));
let (tx, _) = broadcast::channel(1024);

tokio::spawn(produce_items(tx.clone()));

module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| {
let rx = tx.subscribe();

// Convert stream from `Item = Result<T: Serialize, Error>` to `Item = T::Serialize`.
let stream =
BroadcastStream::new(rx).take_while(|r| future::ready(r.is_ok())).filter_map(|r| future::ready(r.ok()));
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}

// Naive example that broadcasts the produced values to all subscribers.
async fn produce_items(tx: broadcast::Sender<i32>) {
let mut i = 0;
loop {
// This might fail if no receivers are alive
// could occur if no subscriptions are active...
let _ = tx.send(i);
i += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}