Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[examples]: update pubsub examples #705

Merged
merged 10 commits into from
Apr 2, 2022
10 changes: 6 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ publish = false

[dev-dependencies]
anyhow = "1"
async-broadcast = "0.3"
env_logger = "0.9"
futures = "0.3"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.8", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }

[[example]]
name = "http"
Expand All @@ -36,12 +38,12 @@ name = "ws"
path = "ws.rs"

[[example]]
name = "ws_subscription"
path = "ws_subscription.rs"
name = "ws_pubsub_broadcast"
path = "ws_pubsub_broadcast.rs"

[[example]]
name = "ws_sub_with_params"
path = "ws_sub_with_params.rs"
name = "ws_pubsub_with_params"
path = "ws_pubsub_with_params.rs"

[[example]]
name = "proc_macro"
Expand Down
47 changes: 31 additions & 16 deletions examples/ws_subscription.rs → examples/ws_pubsub_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example that shows how to broadcasts the produced values to all active subscriptions using `tokio::sync::broadcast`.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

use std::net::SocketAddr;
use std::time::Duration;

use futures::future;
use futures::StreamExt;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
Expand All @@ -44,33 +48,44 @@ async fn main() -> anyhow::Result<()> {
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let mut subscribe_hello: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let client1 = WsClientBuilder::default().build(&url).await?;
let client2 = WsClientBuilder::default().build(&url).await?;
let sub1: Subscription<i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub2: Subscription<i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

let mut i = 0;
while i <= NUM_SUBSCRIPTION_RESPONSES {
let r = subscribe_hello.next().await;
tracing::info!("received {:?}", r);
i += 1;
}
let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) });
let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) });

future::join(fut1, fut2).await;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
}
std::thread::sleep(std::time::Duration::from_secs(1));
let (tx, rx) = async_broadcast::broadcast(16);

tokio::spawn(produce_items(tx.clone()));

module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| {
let rx = rx.clone();

tokio::spawn(async move {
let _ = sink.pipe_from_stream(rx).await;
});
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}

// Naive example that broadcasts the produced values to all subscribers.
async fn produce_items(tx: async_broadcast::Sender<i32>) {
let mut i = 0;
while let Ok(_) = tx.broadcast(i).await {
i += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::time::Duration;

use futures::StreamExt;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -61,21 +65,29 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
std::thread::sleep(std::time::Duration::from_millis(50));
let item = LETTERS.chars().nth(idx);

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
let item = &LETTERS[one..two];

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
Expand Down