diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 251259e9ee..dbd15dbdab 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -25,6 +25,10 @@ path = "ws.rs" name = "ws_subscription" path = "ws_subscription.rs" +[[example]] +name = "ws_sub_with_params" +path = "ws_sub_with_params.rs" + [[example]] name = "proc_macro" path = "proc_macro.rs" diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs new file mode 100644 index 0000000000..9e77ea333c --- /dev/null +++ b/examples/ws_sub_with_params.rs @@ -0,0 +1,82 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use jsonrpsee::{ + ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder}, + ws_server::WsServer, +}; +use std::net::SocketAddr; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + let addr = run_server().await?; + let url = format!("ws://{}", addr); + + let client = WsClientBuilder::default().build(&url).await?; + + // Subscription with a single parameter + let params = JsonRpcParams::Array(vec![3.into()]); + let mut sub_params_one = client.subscribe::>("sub_one_param", params, "unsub_one_param").await?; + println!("subscription with one param: {:?}", sub_params_one.next().await); + + // Subscription with multiple parameters + let params = JsonRpcParams::Array(vec![2.into(), 5.into()]); + let mut sub_params_two = client.subscribe::("sub_params_two", params, "unsub_params_two").await?; + println!("subscription with two params: {:?}", sub_params_two.next().await); + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + const LETTERS: &'static str = "abcdefghijklmnopqrstuvxyz"; + let mut server = WsServer::new("127.0.0.1:0").await?; + let one_param = server.register_subscription("sub_one_param", "unsub_one_param").unwrap(); + let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap(); + + std::thread::spawn(move || loop { + for sink_params in one_param.extract_with_input().iter() { + let idx = *sink_params.params(); + let result = LETTERS.chars().nth(idx); + let _ = sink_params.send(&result); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + }); + + std::thread::spawn(move || loop { + for sink_params in two_params.extract_with_input().iter() { + let params: &Vec = sink_params.params(); + // Validate your params here: check len, check > 0 etc + let result = LETTERS[params[0]..params[1]].to_string(); + let _ = sink_params.send(&result); + } + std::thread::sleep(std::time::Duration::from_millis(100)); + }); + + let addr = server.local_addr(); + tokio::spawn(async move { server.start().await }); + addr +} diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index cba815c3a7..73bd687ece 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -54,10 +54,10 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let mut server = WsServer::new("127.0.0.1:0").await?; - let mut subscription = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); + let mut subscription = server.register_subscription::("subscribe_hello", "unsubscribe_hello").unwrap(); std::thread::spawn(move || loop { - subscription.send(&"hello my friend").unwrap(); + subscription.send_all(&"hello my friend").unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); }); diff --git a/ws-server/src/server/module.rs b/ws-server/src/server/module.rs index 124e59897d..eb5a5d77b2 100644 --- a/ws-server/src/server/module.rs +++ b/ws-server/src/server/module.rs @@ -86,7 +86,7 @@ impl RpcModule { self.methods.insert( subscribe_method_name, Box::new(move |id, params, tx, conn| { - let params = params.parse().ok(); + let params = params.parse().or_else(|_| params.one().map_err(|_| CallError::InvalidParams)).ok(); let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11;