From 616468751aaafc5112f9ecce1d3ad8aa24767b2a Mon Sep 17 00:00:00 2001 From: David Date: Tue, 18 May 2021 11:58:52 +0200 Subject: [PATCH] Subscription example (#324) * Add a test for calling methods with multiple params of multiple types (#308) * Add a test for calling methods with multiple params of multiple types * cargo fmt Co-authored-by: Niklas Adolfsson * [ws client] RegisterNotification support (#303) * Rename NotifResponse to SubscriptionResponse to make room for new impl * Add support for on_notification Subscription types * Fix handling of NotificationHandler in manager * cleanup * Implement NotificationHandler to replace Subscription and clean up plumbing * More cleanup * impl Drop for NotificationHandler * Address pr feedback #1 * ws client register_notification pr feedback 2 * Fix doc * fix typo * Add tests, get NH working * More cleanup of String/&str * fix doc * Drop notification handler on send_back_sink error * ws client notification auto unsubscribe when channel full test * Change order of type params to register_method (#312) * Change order of type params to register_method * Cleanup and fmt * Update ws-server/src/tests.rs Co-authored-by: Niklas Adolfsson * CI: optimize caching (#317) * Bump actions/checkout from 2 to 2.3.4 (#315) Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 2.3.4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v2.3.4) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump actions-rs/cargo from 1 to 1.0.3 (#314) Bumps [actions-rs/cargo](https://github.com/actions-rs/cargo) from 1 to 1.0.3. - [Release notes](https://github.com/actions-rs/cargo/releases) - [Changelog](https://github.com/actions-rs/cargo/blob/master/CHANGELOG.md) - [Commits](https://github.com/actions-rs/cargo/compare/v1...v1.0.3) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump actions-rs/toolchain from 1 to 1.0.7 (#313) Bumps [actions-rs/toolchain](https://github.com/actions-rs/toolchain) from 1 to 1.0.7. - [Release notes](https://github.com/actions-rs/toolchain/releases) - [Changelog](https://github.com/actions-rs/toolchain/blob/master/CHANGELOG.md) - [Commits](https://github.com/actions-rs/toolchain/compare/v1...v1.0.7) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [ws server]: add logs (#319) * WIP - hangs * fix example * cleanup * Add certificate_store() to WsClientBuilder (#321) * Add custom_certificate to WsClientBuilder * Use system certs instead of specified file * Cache client_config * Move client_config logic to fn build * Default use_system_certificates to true * Move out connector * Add CertificateStore type * cargo fmt * cargo clippy * Resolve comment: Rename variable * Resolved comments Co-authored-by: Niklas Adolfsson Co-authored-by: Billy Lindeman Co-authored-by: Denis Pisarev Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Albin Hedman --- examples/Cargo.toml | 4 ++ examples/ws_sub_with_params.rs | 82 ++++++++++++++++++++++++++++++++++ examples/ws_subscription.rs | 4 +- ws-server/src/server/module.rs | 2 +- 4 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 examples/ws_sub_with_params.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 251259e9ee..dbd15dbdab 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -25,6 +25,10 @@ path = "ws.rs" name = "ws_subscription" path = "ws_subscription.rs" +[[example]] +name = "ws_sub_with_params" +path = "ws_sub_with_params.rs" + [[example]] name = "proc_macro" path = "proc_macro.rs" diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs new file mode 100644 index 0000000000..9e77ea333c --- /dev/null +++ b/examples/ws_sub_with_params.rs @@ -0,0 +1,82 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use jsonrpsee::{ + ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder}, + ws_server::WsServer, +}; +use std::net::SocketAddr; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + let addr = run_server().await?; + let url = format!("ws://{}", addr); + + let client = WsClientBuilder::default().build(&url).await?; + + // Subscription with a single parameter + let params = JsonRpcParams::Array(vec![3.into()]); + let mut sub_params_one = client.subscribe::>("sub_one_param", params, "unsub_one_param").await?; + println!("subscription with one param: {:?}", sub_params_one.next().await); + + // Subscription with multiple parameters + let params = JsonRpcParams::Array(vec![2.into(), 5.into()]); + let mut sub_params_two = client.subscribe::("sub_params_two", params, "unsub_params_two").await?; + println!("subscription with two params: {:?}", sub_params_two.next().await); + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + const LETTERS: &'static str = "abcdefghijklmnopqrstuvxyz"; + let mut server = WsServer::new("127.0.0.1:0").await?; + let one_param = server.register_subscription("sub_one_param", "unsub_one_param").unwrap(); + let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap(); + + std::thread::spawn(move || loop { + for sink_params in one_param.extract_with_input().iter() { + let idx = *sink_params.params(); + let result = LETTERS.chars().nth(idx); + let _ = sink_params.send(&result); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + }); + + std::thread::spawn(move || loop { + for sink_params in two_params.extract_with_input().iter() { + let params: &Vec = sink_params.params(); + // Validate your params here: check len, check > 0 etc + let result = LETTERS[params[0]..params[1]].to_string(); + let _ = sink_params.send(&result); + } + std::thread::sleep(std::time::Duration::from_millis(100)); + }); + + let addr = server.local_addr(); + tokio::spawn(async move { server.start().await }); + addr +} diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index cba815c3a7..73bd687ece 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -54,10 +54,10 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let mut server = WsServer::new("127.0.0.1:0").await?; - let mut subscription = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); + let mut subscription = server.register_subscription::("subscribe_hello", "unsubscribe_hello").unwrap(); std::thread::spawn(move || loop { - subscription.send(&"hello my friend").unwrap(); + subscription.send_all(&"hello my friend").unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); }); diff --git a/ws-server/src/server/module.rs b/ws-server/src/server/module.rs index 124e59897d..eb5a5d77b2 100644 --- a/ws-server/src/server/module.rs +++ b/ws-server/src/server/module.rs @@ -86,7 +86,7 @@ impl RpcModule { self.methods.insert( subscribe_method_name, Box::new(move |id, params, tx, conn| { - let params = params.parse().ok(); + let params = params.parse().or_else(|_| params.one().map_err(|_| CallError::InvalidParams)).ok(); let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11;