diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index a1b4fda70b..3e494c1715 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -97,6 +97,11 @@ pub fn server_subscription_response(result: Value) -> String { ) } +/// Server originated notification +pub fn server_notification(method: &str, params: Value) -> String { + format!(r#"{{"jsonrpc":"2.0","method":"{}", "params":{} }}"#, method, serde_json::to_string(¶ms).unwrap()) +} + pub async fn http_request(body: Body, uri: Uri) -> Result { let client = hyper::Client::new(); let r = hyper::Request::post(uri) diff --git a/test-utils/src/types.rs b/test-utils/src/types.rs index ec853d58f3..a2cffcb502 100644 --- a/test-utils/src/types.rs +++ b/test-utils/src/types.rs @@ -1,4 +1,5 @@ use futures_channel::mpsc::{self, Receiver, Sender}; +use futures_channel::oneshot; use futures_util::{ future::FutureExt, io::{BufReader, BufWriter}, @@ -95,6 +96,8 @@ pub enum ServerMode { Response(String), // Send out a subscription ID on a request and continuously send out data on the subscription. Subscription { subscription_id: String, subscription_response: String }, + // Send out a notification after timeout + Notification(String), } /// JSONRPC v2 dummy WebSocket server that sends a hardcoded response. @@ -114,6 +117,25 @@ impl WebSocketTestServer { Self { local_addr, exit: tx } } + // Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured `hardcoded notification` for every connection. + pub async fn with_hardcoded_notification(sockaddr: SocketAddr, notification: String) -> Self { + let (tx, rx) = mpsc::channel::<()>(1); + let (addr_tx, addr_rx) = oneshot::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let listener = rt.block_on(async_std::net::TcpListener::bind(sockaddr)).unwrap(); + let local_addr = listener.local_addr().unwrap(); + + addr_tx.send(local_addr).unwrap(); + rt.block_on(server_backend(listener, rx, ServerMode::Notification(notification))); + }); + + let local_addr = addr_rx.await.unwrap(); + + Self { local_addr, exit: tx } + } + // Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured subscription ID and subscription response. // // NOTE: ignores the actual subscription and unsubscription method. @@ -197,15 +219,24 @@ async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mu loop { let next_ws = ws_stream.next().fuse(); let next_exit = exit.next().fuse(); - let time_out = tokio::time::sleep(Duration::from_secs(1)).fuse(); + let time_out = tokio::time::sleep(Duration::from_millis(200)).fuse(); + pin_mut!(time_out, next_exit, next_ws); select! { _ = time_out => { - if let ServerMode::Subscription { subscription_response, .. } = &mode { - if let Err(e) = sender.send_text(&subscription_response).await { - log::warn!("send response to subscription: {:?}", e); - } + match &mode { + ServerMode::Subscription { subscription_response, .. } => { + if let Err(e) = sender.send_text(&subscription_response).await { + log::warn!("send response to subscription: {:?}", e); + } + }, + ServerMode::Notification(n) => { + if let Err(e) = sender.send_text(&n).await { + log::warn!("send notification: {:?}", e); + } + }, + _ => {} } } ws = next_ws => { @@ -217,12 +248,13 @@ async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mu if let Err(e) = sender.send_text(&r).await { log::warn!("send response to request error: {:?}", e); } - } + }, ServerMode::Subscription { subscription_id, .. } => { if let Err(e) = sender.send_text(&subscription_id).await { log::warn!("send subscription id error: {:?}", e); } - } + }, + _ => {} } } } diff --git a/types/src/client.rs b/types/src/client.rs index a902334013..b6e3eb4226 100644 --- a/types/src/client.rs +++ b/types/src/client.rs @@ -17,6 +17,18 @@ pub struct Subscription { pub marker: PhantomData, } +/// Active NotificationHandler on a Client. +pub struct NotificationHandler { + /// Channel to send requests to the background task. + pub to_back: mpsc::Sender, + /// Channel from which we receive notifications from the server, as encoded `JsonValue`s. + pub notifs_rx: mpsc::Receiver, + /// Method Name + pub method: String, + /// Marker in order to pin the `Notif` parameter. + pub marker: PhantomData, +} + /// Batch request message. #[derive(Debug)] pub struct BatchMessage { @@ -56,6 +68,17 @@ pub struct SubscriptionMessage { pub send_back: oneshot::Sender, SubscriptionId), Error>>, } +/// RegisterNotification message. +#[derive(Debug)] +pub struct RegisterNotificationMessage { + /// Method name this notification handler is attached to + pub method: String, + /// We return a [`mpsc::Receiver`] that will receive notifications. + /// When we get a response from the server about that subscription, we send the result over + /// this channel. + pub send_back: oneshot::Sender, String), Error>>, +} + /// Message that the Client can send to the background task. #[derive(Debug)] pub enum FrontToBack { @@ -67,6 +90,10 @@ pub enum FrontToBack { Request(RequestMessage), /// Send a subscription request to the server. Subscribe(SubscriptionMessage), + /// Register a notification handler + RegisterNotification(RegisterNotificationMessage), + /// Unregister a notification handler + UnregisterNotification(String), /// When a subscription channel is closed, we send this message to the background /// task to mark it ready for garbage collection. // NOTE: It is not possible to cancel pending subscriptions or pending requests. @@ -97,6 +124,28 @@ where } } +impl NotificationHandler +where + Notif: DeserializeOwned, +{ + /// Returns the next notification from the stream + /// This may return `None` if the method has been unregistered, + /// may happen if the channel becomes full or is dropped. + /// + /// Ignores any malformed packet. + pub async fn next(&mut self) -> Option { + loop { + match self.notifs_rx.next().await { + Some(n) => match serde_json::from_value(n) { + Ok(parsed) => return Some(parsed), + Err(e) => log::debug!("NotificationHandler response error: {:?}", e), + }, + None => return None, + } + } + } +} + impl Drop for Subscription { fn drop(&mut self) { // We can't actually guarantee that this goes through. If the background task is busy, then @@ -107,3 +156,12 @@ impl Drop for Subscription { let _ = self.to_back.send(FrontToBack::SubscriptionClosed(id)).now_or_never(); } } + +impl Drop for NotificationHandler { + fn drop(&mut self) { + // We can't actually guarantee that this goes through. If the background task is busy, then + // the channel's buffer will be full, and our unregister request will never make it. + let notif_method = std::mem::take(&mut self.method); + let _ = self.to_back.send(FrontToBack::UnregisterNotification(notif_method)).now_or_never(); + } +} diff --git a/types/src/error.rs b/types/src/error.rs index 75212b3882..e83feb21ac 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -67,6 +67,9 @@ pub enum Error { /// Invalid request ID. #[error("Invalid request ID")] InvalidRequestId, + /// Client received a notification with an unregistered method + #[error("Unregistered notification method")] + UnregisteredNotification(String), /// A request with the same request ID has already been registered. #[error("A request with the same request ID has already been registered")] DuplicateRequestId, diff --git a/types/src/traits.rs b/types/src/traits.rs index 96bafbdc3a..0666227ac1 100644 --- a/types/src/traits.rs +++ b/types/src/traits.rs @@ -1,5 +1,5 @@ use crate::v2::params::{JsonRpcParams, RpcParams}; -use crate::{Error, Subscription}; +use crate::{Error, NotificationHandler, Subscription}; use async_trait::async_trait; use serde::de::DeserializeOwned; @@ -44,6 +44,11 @@ pub trait SubscriptionClient: Client { ) -> Result, Error> where Notif: DeserializeOwned; + + /// Register a NotificationHandler that will listen for incoming JSON-RPC notifications + async fn register_notification<'a, Notif>(&self, method: &'a str) -> Result, Error> + where + Notif: DeserializeOwned; } /// JSON-RPC server interface for managing method calls. diff --git a/types/src/v2/response.rs b/types/src/v2/response.rs index addc181867..44b0582f6d 100644 --- a/types/src/v2/response.rs +++ b/types/src/v2/response.rs @@ -14,11 +14,22 @@ pub struct JsonRpcResponse<'a, T> { pub id: Option<&'a RawValue>, } -/// JSON-RPC notification response. +/// JSON-RPC subscription response. #[derive(Deserialize, Debug)] -pub struct JsonRpcNotifResponse { +pub struct JsonRpcSubscriptionResponse { /// JSON-RPC version. pub jsonrpc: TwoPointZero, /// Params. pub params: JsonRpcNotificationParamsAlloc, } + +/// JSON-RPC notification response. +#[derive(Deserialize, Debug)] +pub struct JsonRpcNotifResponse<'a, T> { + /// JSON-RPC version. + pub jsonrpc: TwoPointZero, + /// Method + pub method: &'a str, + /// Params. + pub params: T, +} diff --git a/ws-client/src/client.rs b/ws-client/src/client.rs index 08ef718da6..b420d7b448 100644 --- a/ws-client/src/client.rs +++ b/ws-client/src/client.rs @@ -25,18 +25,19 @@ // DEALINGS IN THE SOFTWARE. use crate::helpers::{ - build_unsubscribe_message, process_batch_response, process_error_response, process_single_response, - process_subscription_response, stop_subscription, + build_unsubscribe_message, process_batch_response, process_error_response, process_notification, + process_single_response, process_subscription_response, stop_subscription, }; use crate::traits::{Client, SubscriptionClient}; use crate::transport::{parse_url, Receiver as WsReceiver, Sender as WsSender, WsTransportClientBuilder}; use crate::v2::error::JsonRpcErrorAlloc; use crate::v2::params::{Id, JsonRpcParams}; use crate::v2::request::{JsonRpcCallSer, JsonRpcNotificationSer}; -use crate::v2::response::{JsonRpcNotifResponse, JsonRpcResponse}; +use crate::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponse}; use crate::TEN_MB_SIZE_BYTES; use crate::{ - manager::RequestManager, BatchMessage, Error, FrontToBack, RequestMessage, Subscription, SubscriptionMessage, + manager::RequestManager, BatchMessage, Error, FrontToBack, NotificationHandler, RegisterNotificationMessage, + RequestMessage, Subscription, SubscriptionMessage, }; use async_std::sync::Mutex; use async_trait::async_trait; @@ -455,6 +456,38 @@ impl SubscriptionClient for WsClient { }; Ok(Subscription { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData, id }) } + + /// Register a notification handler for async messages from the server. + /// + async fn register_notification<'a, N>(&self, method: &'a str) -> Result, Error> + where + N: DeserializeOwned, + { + log::trace!("[frontend]: register_notification: {:?}", method); + + let (send_back_tx, send_back_rx) = oneshot::channel(); + if self + .to_back + .clone() + .send(FrontToBack::RegisterNotification(RegisterNotificationMessage { + send_back: send_back_tx, + method: method.to_owned(), + })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = send_back_rx.await; + let (notifs_rx, method) = match res { + Ok(Ok(val)) => val, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + Ok(NotificationHandler { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData, method }) + } } /// Function being run in the background that processes messages from the frontend. @@ -550,6 +583,24 @@ async fn background_task( stop_subscription(&mut sender, &mut manager, unsub).await; } } + + // User called `register_notification` on the front-end. + Either::Left((Some(FrontToBack::RegisterNotification(reg)), _)) => { + log::trace!("[backend] registering notification handler: {:?}", reg.method); + let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); + + if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { + let _ = reg.send_back.send(Ok((subscribe_rx, reg.method))); + } else { + let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method))); + } + } + + // User dropped the notificationHandler for this method + Either::Left((Some(FrontToBack::UnregisterNotification(method)), _)) => { + log::trace!("[backend] unregistering notification handler: {:?}", method); + let _ = manager.remove_notification_handler(method); + } Either::Right((Some(Ok(raw)), _)) => { // Single response to a request. if let Ok(single) = serde_json::from_slice::>(&raw) { @@ -566,12 +617,17 @@ async fn background_task( } } // Subscription response. - else if let Ok(notif) = serde_json::from_slice::>(&raw) { + else if let Ok(notif) = serde_json::from_slice::>(&raw) { log::debug!("[backend]: recv subscription {:?}", notif); if let Err(Some(unsub)) = process_subscription_response(&mut manager, notif) { let _ = stop_subscription(&mut sender, &mut manager, unsub).await; } } + // Incoming Notification + else if let Ok(notif) = serde_json::from_slice::>(&raw) { + log::debug!("[backend]: recv notification {:?}", notif); + let _ = process_notification(&mut manager, notif); + } // Batch response. else if let Ok(batch) = serde_json::from_slice::>>(&raw) { log::debug!("[backend]: recv batch {:?}", batch); diff --git a/ws-client/src/helpers.rs b/ws-client/src/helpers.rs index 540b630095..26acad88a9 100644 --- a/ws-client/src/helpers.rs +++ b/ws-client/src/helpers.rs @@ -4,7 +4,7 @@ use futures::channel::mpsc; use jsonrpsee_types::v2::params::{Id, JsonRpcParams, SubscriptionId}; use jsonrpsee_types::v2::parse_request_id; use jsonrpsee_types::v2::request::JsonRpcCallSer; -use jsonrpsee_types::v2::response::{JsonRpcNotifResponse, JsonRpcResponse}; +use jsonrpsee_types::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponse}; use jsonrpsee_types::{v2::error::JsonRpcErrorAlloc, Error, RequestMessage}; use serde_json::Value as JsonValue; @@ -47,7 +47,7 @@ pub fn process_batch_response(manager: &mut RequestManager, rps: Vec, + notif: JsonRpcSubscriptionResponse, ) -> Result<(), Option> { let sub_id = notif.params.subscription; let request_id = match manager.get_request_id_by_subscription_id(&sub_id) { @@ -72,6 +72,27 @@ pub fn process_subscription_response( } } +/// Attempts to process an incoming notification +/// +/// Returns Ok() if the response was successfully handled +/// Returns Err() if there was no handler for the method +pub fn process_notification(manager: &mut RequestManager, notif: JsonRpcNotifResponse) -> Result<(), Error> { + match manager.as_notification_handler_mut(notif.method.to_owned()) { + Some(send_back_sink) => match send_back_sink.try_send(notif.params) { + Ok(()) => Ok(()), + Err(err) => { + log::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err); + let _ = manager.remove_notification_handler(notif.method.to_owned()); + Err(Error::Internal(err.into_send_error())) + } + }, + None => { + log::error!("Notification: {:?} not a registered method", notif.method); + Err(Error::UnregisteredNotification(notif.method.to_owned())) + } + } +} + /// Process a response from the server. /// /// Returns `Ok(None)` if the response was successfully sent. diff --git a/ws-client/src/manager.rs b/ws-client/src/manager.rs index b92676bbb0..d141623734 100644 --- a/ws-client/src/manager.rs +++ b/ws-client/src/manager.rs @@ -57,6 +57,8 @@ pub struct RequestManager { subscriptions: HashMap, /// Pending batch requests batches: FnvHashMap, BatchState>, + /// Registered Methods for incoming notifications + notification_handlers: HashMap, } impl RequestManager { @@ -146,6 +148,25 @@ impl RequestManager { } } + /// Inserts a handler for incoming notifications + pub fn insert_notification_handler(&mut self, method: &str, send_back: SubscriptionSink) -> Result<(), Error> { + if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) { + handle.insert(send_back); + Ok(()) + } else { + Err(Error::MethodAlreadyRegistered(method.to_owned())) + } + } + + /// Removes a notification handler + pub fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> { + if let Some(_) = self.notification_handlers.remove(&method) { + Ok(()) + } else { + Err(Error::UnregisteredNotification(method.to_owned())) + } + } + /// Tries to complete a pending subscription. /// /// Returns `Some` if the subscription was completed otherwise `None`. @@ -240,6 +261,13 @@ impl RequestManager { } } + /// Get a mutable reference to underlying `Sink` in order to send incmoing notifications to the subscription. + /// + /// Returns `Some` if the `method` was registered as a NotificationHandler otherwise `None`. + pub fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> { + self.notification_handlers.get_mut(&method) + } + /// Reverse lookup to get the request ID for a subscription ID. /// /// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`. diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 422147bc62..e395bbc431 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -6,7 +6,7 @@ use crate::v2::{ }; use crate::{ traits::{Client, SubscriptionClient}, - Error, Subscription, WsClientBuilder, + Error, NotificationHandler, Subscription, WsClientBuilder, }; use jsonrpsee_test_utils::helpers::*; use jsonrpsee_test_utils::types::{Id, WebSocketTestServer}; @@ -81,6 +81,54 @@ async fn subscription_works() { } } +#[tokio::test] +async fn notification_handler_works() { + let server = WebSocketTestServer::with_hardcoded_notification( + "127.0.0.1:0".parse().unwrap(), + server_notification("test", "server originated notification works".into()), + ) + .await; + + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClientBuilder::default().build(&uri).await.unwrap(); + { + let mut nh: NotificationHandler = client.register_notification("test").await.unwrap(); + let response: String = nh.next().await.unwrap(); + assert_eq!("server originated notification works".to_owned(), response); + } +} + +#[tokio::test] +async fn notification_without_polling_doesnt_make_client_unuseable() { + let server = WebSocketTestServer::with_hardcoded_notification( + "127.0.0.1:0".parse().unwrap(), + server_notification("test", "server originated notification".into()), + ) + .await; + + let uri = to_ws_uri_string(server.local_addr()); + let client = WsClientBuilder::default().max_notifs_per_subscription(4).build(&uri).await.unwrap(); + let mut nh: NotificationHandler = client.register_notification("test").await.unwrap(); + + // don't poll the notification stream for 2 seconds, should be full now. + std::thread::sleep(std::time::Duration::from_secs(2)); + + // Capacity is `num_sender` + `capacity` + for _ in 0..5 { + assert!(nh.next().await.is_some()); + } + + // NOTE: this is now unuseable and unregistered. + assert!(nh.next().await.is_none()); + + // The same subscription should be possible to register again. + let mut other_nh: NotificationHandler = client.register_notification("test").await.unwrap(); + + // check that the new subscription works and the old one is still closed + assert!(other_nh.next().await.is_some()); + assert!(nh.next().await.is_none()); +} + #[tokio::test] async fn batch_request_works() { let batch_request = vec![