Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[examples]: update pubsub examples #705

Merged
merged 10 commits into from
Apr 2, 2022
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.8", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }

[[example]]
name = "http"
Expand Down
28 changes: 20 additions & 8 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::time::Duration;

use futures::StreamExt;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -61,21 +65,29 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
std::thread::sleep(std::time::Duration::from_millis(50));
let item = LETTERS.chars().nth(idx);

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
let item = &LETTERS[one..two];

let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
let _ = sink.pipe_from_stream(stream).await;
});
Ok(())
})
Expand Down
61 changes: 48 additions & 13 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,25 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
//! Example that shows how to broadcasts the produced values to active all subscriptions using `mpsc channels`.
//!
//! It's possible to use `tokio::sync::broadcast` too but because the Receiver doesn't implement
//! stream we chose this `mpsc channels` in this example.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

use std::{net::SocketAddr, sync::Arc};

use futures::channel::mpsc;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
use tokio::sync::Mutex;

const NUM_SUBSCRIPTION_RESPONSES: usize = 5;

/// Sinks that can be shared across threads.
type SharedSinks = Arc<Mutex<Vec<mpsc::UnboundedSender<i32>>>>;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
Expand All @@ -45,13 +54,12 @@ async fn main() -> anyhow::Result<()> {
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let mut subscribe_hello: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let mut sub: Subscription<i32> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

let mut i = 0;
while i <= NUM_SUBSCRIPTION_RESPONSES {
let r = subscribe_hello.next().await;
tracing::info!("received {:?}", r);
let r = sub.next().await.unwrap().unwrap();
tracing::info!("{}", r);
i += 1;
}

Expand All @@ -60,17 +68,44 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
}
std::thread::sleep(std::time::Duration::from_secs(1));
let sinks = SharedSinks::default();
let mut module = RpcModule::new(sinks.clone());

// Produce new items for the server to publish.
tokio::spawn(produce_items(sinks));

module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, sink, ctx| {
let ctx = ctx.clone();
tokio::spawn(async move {
let (tx, rx) = mpsc::unbounded();
ctx.lock().await.push(tx);
let _ = sink.pipe_from_stream(rx).await;
});
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}

/// Produce new values that are sent to each active subscription.
async fn produce_items(sinks: SharedSinks) {
let mut count = 0;
loop {
let mut to_remove = Vec::new();

for (idx, sink) in sinks.lock().await.iter().enumerate() {
if sink.unbounded_send(count).is_err() {
to_remove.push(idx);
}
}

// If the channel is closed remove that channel.
for rm in to_remove {
sinks.lock().await.remove(rm);
}

count += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}