Skip to content

Commit

Permalink
[examples]: update pubsub examples (#705)
Browse files Browse the repository at this point in the history
* update pubsub examples

* Update examples/ws_subscription.rs

* remove some docs

* remove needless clone

* simplify example

* simplify code with async-broadcast channel

* use tokio broadcast for smaller dependency tree

* Update examples/ws_pubsub_broadcast.rs
  • Loading branch information
niklasad1 committed Apr 2, 2022
1 parent 34c2fbe commit 961e6bd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 28 deletions.
9 changes: 5 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.8", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }

[[example]]
name = "http"
Expand All @@ -36,12 +37,12 @@ name = "ws"
path = "ws.rs"

[[example]]
name = "ws_subscription"
path = "ws_subscription.rs"
name = "ws_pubsub_broadcast"
path = "ws_pubsub_broadcast.rs"

[[example]]
name = "ws_sub_with_params"
path = "ws_sub_with_params.rs"
name = "ws_pubsub_with_params"
path = "ws_pubsub_with_params.rs"

[[example]]
name = "proc_macro"
Expand Down
53 changes: 37 additions & 16 deletions examples/ws_subscription.rs → examples/ws_pubsub_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example that shows how to broadcast to all active subscriptions using `tokio::sync::broadcast`.

use std::net::SocketAddr;

use futures::future;
use futures::StreamExt;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;

const NUM_SUBSCRIPTION_RESPONSES: usize = 5;

Expand All @@ -44,33 +49,49 @@ async fn main() -> anyhow::Result<()> {
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let mut subscribe_hello: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let client1 = WsClientBuilder::default().build(&url).await?;
let client2 = WsClientBuilder::default().build(&url).await?;
let sub1: Subscription<i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub2: Subscription<i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

let mut i = 0;
while i <= NUM_SUBSCRIPTION_RESPONSES {
let r = subscribe_hello.next().await;
tracing::info!("received {:?}", r);
i += 1;
}
let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) });
let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) });

future::join(fut1, fut2).await;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
}
std::thread::sleep(std::time::Duration::from_secs(1));
let (tx, _rx) = broadcast::channel(16);
let tx2 = tx.clone();

std::thread::spawn(move || produce_items(tx2));

module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| {
let rx = BroadcastStream::new(tx.clone().subscribe());

tokio::spawn(async move {
let _ = sink.pipe_from_try_stream(rx).await;
});
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}

// Naive example that broadcasts the produced values to all active subscribers.
fn produce_items(tx: broadcast::Sender<usize>) {
for c in 1..=100 {
std::thread::sleep(std::time::Duration::from_secs(1));

// This might fail if no receivers are alive, could occur if no subscriptions are active...
// Also be aware that this will succeed when at least one receiver is alive
// Thus, clients connecting at different point in time will not receive
// the items sent before the subscription got established.
let _ = tx.send(c);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::time::Duration;

use futures::StreamExt;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -61,21 +65,29 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
std::thread::sleep(std::time::Duration::from_millis(50));
let item = LETTERS.chars().nth(idx);

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
let item = &LETTERS[one..two];

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
Expand Down

0 comments on commit 961e6bd

Please sign in to comment.