Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[examples]: update pubsub examples #705

Merged
merged 10 commits into from
Apr 2, 2022
9 changes: 5 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.8", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }

[[example]]
name = "http"
Expand All @@ -36,12 +37,12 @@ name = "ws"
path = "ws.rs"

[[example]]
name = "ws_subscription"
path = "ws_subscription.rs"
name = "ws_pubsub_broadcast"
path = "ws_pubsub_broadcast.rs"

[[example]]
name = "ws_sub_with_params"
path = "ws_sub_with_params.rs"
name = "ws_pubsub_with_params"
path = "ws_pubsub_with_params.rs"

[[example]]
name = "proc_macro"
Expand Down
53 changes: 37 additions & 16 deletions examples/ws_subscription.rs → examples/ws_pubsub_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example that shows how to broadcasts the produced values to all active subscriptions using `tokio::sync::broadcast`.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

use std::net::SocketAddr;

use futures::future;
use futures::StreamExt;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;

const NUM_SUBSCRIPTION_RESPONSES: usize = 5;

Expand All @@ -44,33 +49,49 @@ async fn main() -> anyhow::Result<()> {
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let mut subscribe_hello: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let client1 = WsClientBuilder::default().build(&url).await?;
let client2 = WsClientBuilder::default().build(&url).await?;
let sub1: Subscription<i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub2: Subscription<i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

let mut i = 0;
while i <= NUM_SUBSCRIPTION_RESPONSES {
let r = subscribe_hello.next().await;
tracing::info!("received {:?}", r);
i += 1;
}
let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) });
let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) });

future::join(fut1, fut2).await;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
}
std::thread::sleep(std::time::Duration::from_secs(1));
let (tx, _rx) = broadcast::channel(16);
let tx2 = tx.clone();

std::thread::spawn(move || produce_items(tx2));

module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| {
let rx = BroadcastStream::new(tx.clone().subscribe());

tokio::spawn(async move {
let _ = sink.pipe_from_try_stream(rx).await;
});
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}

// Naive example that broadcasts the produced values to all active subscribers.
fn produce_items(tx: broadcast::Sender<usize>) {
for c in 1..=100 {
std::thread::sleep(std::time::Duration::from_secs(1));

// This might fail if no receivers are alive, could occur if no subscriptions are active...
// Also be aware that this will succeed when at least one receiver is alive
// Thus, clients connecting at different point in time will not receive
// the items sent before the subscription got established.
let _ = tx.send(c);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::time::Duration;

use futures::StreamExt;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -61,21 +65,29 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
std::thread::sleep(std::time::Duration::from_millis(50));
let item = LETTERS.chars().nth(idx);

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
let item = &LETTERS[one..two];

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
Expand Down