Skip to content

Commit

Permalink
[ws client] RegisterNotification support (#303)
Browse files Browse the repository at this point in the history
* Rename NotifResponse to SubscriptionResponse to make room for new impl

* Add support for on_notification Subscription<T> types

* Fix handling of NotificationHandler in manager

* cleanup

* Implement NotificationHandler to replace Subscription<T> and clean up plumbing

* More cleanup

* impl Drop for NotificationHandler

* Address pr feedback #1

* ws client register_notification pr feedback 2

* Fix doc

* fix typo

* Add tests, get NH working

* More cleanup of String/&str

* fix doc

* Drop notification handler on send_back_sink error

* ws client notification auto unsubscribe when channel full test
  • Loading branch information
billylindeman committed May 12, 2021
1 parent 1549db4 commit c738447
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 18 deletions.
5 changes: 5 additions & 0 deletions test-utils/src/helpers.rs
Expand Up @@ -97,6 +97,11 @@ pub fn server_subscription_response(result: Value) -> String {
)
}

/// Server originated notification
pub fn server_notification(method: &str, params: Value) -> String {
format!(r#"{{"jsonrpc":"2.0","method":"{}", "params":{} }}"#, method, serde_json::to_string(&params).unwrap())
}

pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String> {
let client = hyper::Client::new();
let r = hyper::Request::post(uri)
Expand Down
46 changes: 39 additions & 7 deletions test-utils/src/types.rs
@@ -1,4 +1,5 @@
use futures_channel::mpsc::{self, Receiver, Sender};
use futures_channel::oneshot;
use futures_util::{
future::FutureExt,
io::{BufReader, BufWriter},
Expand Down Expand Up @@ -95,6 +96,8 @@ pub enum ServerMode {
Response(String),
// Send out a subscription ID on a request and continuously send out data on the subscription.
Subscription { subscription_id: String, subscription_response: String },
// Send out a notification after timeout
Notification(String),
}

/// JSONRPC v2 dummy WebSocket server that sends a hardcoded response.
Expand All @@ -114,6 +117,25 @@ impl WebSocketTestServer {
Self { local_addr, exit: tx }
}

// Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured `hardcoded notification` for every connection.
pub async fn with_hardcoded_notification(sockaddr: SocketAddr, notification: String) -> Self {
let (tx, rx) = mpsc::channel::<()>(1);
let (addr_tx, addr_rx) = oneshot::channel();

std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let listener = rt.block_on(async_std::net::TcpListener::bind(sockaddr)).unwrap();
let local_addr = listener.local_addr().unwrap();

addr_tx.send(local_addr).unwrap();
rt.block_on(server_backend(listener, rx, ServerMode::Notification(notification)));
});

let local_addr = addr_rx.await.unwrap();

Self { local_addr, exit: tx }
}

// Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured subscription ID and subscription response.
//
// NOTE: ignores the actual subscription and unsubscription method.
Expand Down Expand Up @@ -197,15 +219,24 @@ async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mu
loop {
let next_ws = ws_stream.next().fuse();
let next_exit = exit.next().fuse();
let time_out = tokio::time::sleep(Duration::from_secs(1)).fuse();
let time_out = tokio::time::sleep(Duration::from_millis(200)).fuse();

pin_mut!(time_out, next_exit, next_ws);

select! {
_ = time_out => {
if let ServerMode::Subscription { subscription_response, .. } = &mode {
if let Err(e) = sender.send_text(&subscription_response).await {
log::warn!("send response to subscription: {:?}", e);
}
match &mode {
ServerMode::Subscription { subscription_response, .. } => {
if let Err(e) = sender.send_text(&subscription_response).await {
log::warn!("send response to subscription: {:?}", e);
}
},
ServerMode::Notification(n) => {
if let Err(e) = sender.send_text(&n).await {
log::warn!("send notification: {:?}", e);
}
},
_ => {}
}
}
ws = next_ws => {
Expand All @@ -217,12 +248,13 @@ async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mu
if let Err(e) = sender.send_text(&r).await {
log::warn!("send response to request error: {:?}", e);
}
}
},
ServerMode::Subscription { subscription_id, .. } => {
if let Err(e) = sender.send_text(&subscription_id).await {
log::warn!("send subscription id error: {:?}", e);
}
}
},
_ => {}
}
}
}
Expand Down
58 changes: 58 additions & 0 deletions types/src/client.rs
Expand Up @@ -17,6 +17,18 @@ pub struct Subscription<Notif> {
pub marker: PhantomData<Notif>,
}

/// Active NotificationHandler on a Client.
pub struct NotificationHandler<Notif> {
/// Channel to send requests to the background task.
pub to_back: mpsc::Sender<FrontToBack>,
/// Channel from which we receive notifications from the server, as encoded `JsonValue`s.
pub notifs_rx: mpsc::Receiver<JsonValue>,
/// Method Name
pub method: String,
/// Marker in order to pin the `Notif` parameter.
pub marker: PhantomData<Notif>,
}

/// Batch request message.
#[derive(Debug)]
pub struct BatchMessage {
Expand Down Expand Up @@ -56,6 +68,17 @@ pub struct SubscriptionMessage {
pub send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>,
}

/// RegisterNotification message.
#[derive(Debug)]
pub struct RegisterNotificationMessage {
/// Method name this notification handler is attached to
pub method: String,
/// We return a [`mpsc::Receiver`] that will receive notifications.
/// When we get a response from the server about that subscription, we send the result over
/// this channel.
pub send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, String), Error>>,
}

/// Message that the Client can send to the background task.
#[derive(Debug)]
pub enum FrontToBack {
Expand All @@ -67,6 +90,10 @@ pub enum FrontToBack {
Request(RequestMessage),
/// Send a subscription request to the server.
Subscribe(SubscriptionMessage),
/// Register a notification handler
RegisterNotification(RegisterNotificationMessage),
/// Unregister a notification handler
UnregisterNotification(String),
/// When a subscription channel is closed, we send this message to the background
/// task to mark it ready for garbage collection.
// NOTE: It is not possible to cancel pending subscriptions or pending requests.
Expand Down Expand Up @@ -97,6 +124,28 @@ where
}
}

impl<Notif> NotificationHandler<Notif>
where
Notif: DeserializeOwned,
{
/// Returns the next notification from the stream
/// This may return `None` if the method has been unregistered,
/// may happen if the channel becomes full or is dropped.
///
/// Ignores any malformed packet.
pub async fn next(&mut self) -> Option<Notif> {
loop {
match self.notifs_rx.next().await {
Some(n) => match serde_json::from_value(n) {
Ok(parsed) => return Some(parsed),
Err(e) => log::debug!("NotificationHandler response error: {:?}", e),
},
None => return None,
}
}
}
}

impl<Notif> Drop for Subscription<Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
Expand All @@ -107,3 +156,12 @@ impl<Notif> Drop for Subscription<Notif> {
let _ = self.to_back.send(FrontToBack::SubscriptionClosed(id)).now_or_never();
}
}

impl<Notif> Drop for NotificationHandler<Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
// the channel's buffer will be full, and our unregister request will never make it.
let notif_method = std::mem::take(&mut self.method);
let _ = self.to_back.send(FrontToBack::UnregisterNotification(notif_method)).now_or_never();
}
}
3 changes: 3 additions & 0 deletions types/src/error.rs
Expand Up @@ -67,6 +67,9 @@ pub enum Error {
/// Invalid request ID.
#[error("Invalid request ID")]
InvalidRequestId,
/// Client received a notification with an unregistered method
#[error("Unregistered notification method")]
UnregisteredNotification(String),
/// A request with the same request ID has already been registered.
#[error("A request with the same request ID has already been registered")]
DuplicateRequestId,
Expand Down
7 changes: 6 additions & 1 deletion types/src/traits.rs
@@ -1,5 +1,5 @@
use crate::v2::params::{JsonRpcParams, RpcParams};
use crate::{Error, Subscription};
use crate::{Error, NotificationHandler, Subscription};
use async_trait::async_trait;
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -44,6 +44,11 @@ pub trait SubscriptionClient: Client {
) -> Result<Subscription<Notif>, Error>
where
Notif: DeserializeOwned;

/// Register a NotificationHandler<Notif> that will listen for incoming JSON-RPC notifications
async fn register_notification<'a, Notif>(&self, method: &'a str) -> Result<NotificationHandler<Notif>, Error>
where
Notif: DeserializeOwned;
}

/// JSON-RPC server interface for managing method calls.
Expand Down
15 changes: 13 additions & 2 deletions types/src/v2/response.rs
Expand Up @@ -14,11 +14,22 @@ pub struct JsonRpcResponse<'a, T> {
pub id: Option<&'a RawValue>,
}

/// JSON-RPC notification response.
/// JSON-RPC subscription response.
#[derive(Deserialize, Debug)]
pub struct JsonRpcNotifResponse<T> {
pub struct JsonRpcSubscriptionResponse<T> {
/// JSON-RPC version.
pub jsonrpc: TwoPointZero,
/// Params.
pub params: JsonRpcNotificationParamsAlloc<T>,
}

/// JSON-RPC notification response.
#[derive(Deserialize, Debug)]
pub struct JsonRpcNotifResponse<'a, T> {
/// JSON-RPC version.
pub jsonrpc: TwoPointZero,
/// Method
pub method: &'a str,
/// Params.
pub params: T,
}
66 changes: 61 additions & 5 deletions ws-client/src/client.rs
Expand Up @@ -25,18 +25,19 @@
// DEALINGS IN THE SOFTWARE.

use crate::helpers::{
build_unsubscribe_message, process_batch_response, process_error_response, process_single_response,
process_subscription_response, stop_subscription,
build_unsubscribe_message, process_batch_response, process_error_response, process_notification,
process_single_response, process_subscription_response, stop_subscription,
};
use crate::traits::{Client, SubscriptionClient};
use crate::transport::{parse_url, Receiver as WsReceiver, Sender as WsSender, WsTransportClientBuilder};
use crate::v2::error::JsonRpcErrorAlloc;
use crate::v2::params::{Id, JsonRpcParams};
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotificationSer};
use crate::v2::response::{JsonRpcNotifResponse, JsonRpcResponse};
use crate::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponse};
use crate::TEN_MB_SIZE_BYTES;
use crate::{
manager::RequestManager, BatchMessage, Error, FrontToBack, RequestMessage, Subscription, SubscriptionMessage,
manager::RequestManager, BatchMessage, Error, FrontToBack, NotificationHandler, RegisterNotificationMessage,
RequestMessage, Subscription, SubscriptionMessage,
};
use async_std::sync::Mutex;
use async_trait::async_trait;
Expand Down Expand Up @@ -455,6 +456,38 @@ impl SubscriptionClient for WsClient {
};
Ok(Subscription { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData, id })
}

/// Register a notification handler for async messages from the server.
///
async fn register_notification<'a, N>(&self, method: &'a str) -> Result<NotificationHandler<N>, Error>
where
N: DeserializeOwned,
{
log::trace!("[frontend]: register_notification: {:?}", method);

let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
.clone()
.send(FrontToBack::RegisterNotification(RegisterNotificationMessage {
send_back: send_back_tx,
method: method.to_owned(),
}))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let (notifs_rx, method) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};

Ok(NotificationHandler { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData, method })
}
}

/// Function being run in the background that processes messages from the frontend.
Expand Down Expand Up @@ -550,6 +583,24 @@ async fn background_task(
stop_subscription(&mut sender, &mut manager, unsub).await;
}
}

// User called `register_notification` on the front-end.
Either::Left((Some(FrontToBack::RegisterNotification(reg)), _)) => {
log::trace!("[backend] registering notification handler: {:?}", reg.method);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);

if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method)));
}
}

// User dropped the notificationHandler for this method
Either::Left((Some(FrontToBack::UnregisterNotification(method)), _)) => {
log::trace!("[backend] unregistering notification handler: {:?}", method);
let _ = manager.remove_notification_handler(method);
}
Either::Right((Some(Ok(raw)), _)) => {
// Single response to a request.
if let Ok(single) = serde_json::from_slice::<JsonRpcResponse<_>>(&raw) {
Expand All @@ -566,12 +617,17 @@ async fn background_task(
}
}
// Subscription response.
else if let Ok(notif) = serde_json::from_slice::<JsonRpcNotifResponse<_>>(&raw) {
else if let Ok(notif) = serde_json::from_slice::<JsonRpcSubscriptionResponse<_>>(&raw) {
log::debug!("[backend]: recv subscription {:?}", notif);
if let Err(Some(unsub)) = process_subscription_response(&mut manager, notif) {
let _ = stop_subscription(&mut sender, &mut manager, unsub).await;
}
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_slice::<JsonRpcNotifResponse<_>>(&raw) {
log::debug!("[backend]: recv notification {:?}", notif);
let _ = process_notification(&mut manager, notif);
}
// Batch response.
else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcResponse<_>>>(&raw) {
log::debug!("[backend]: recv batch {:?}", batch);
Expand Down
25 changes: 23 additions & 2 deletions ws-client/src/helpers.rs
Expand Up @@ -4,7 +4,7 @@ use futures::channel::mpsc;
use jsonrpsee_types::v2::params::{Id, JsonRpcParams, SubscriptionId};
use jsonrpsee_types::v2::parse_request_id;
use jsonrpsee_types::v2::request::JsonRpcCallSer;
use jsonrpsee_types::v2::response::{JsonRpcNotifResponse, JsonRpcResponse};
use jsonrpsee_types::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponse};
use jsonrpsee_types::{v2::error::JsonRpcErrorAlloc, Error, RequestMessage};
use serde_json::Value as JsonValue;

Expand Down Expand Up @@ -47,7 +47,7 @@ pub fn process_batch_response(manager: &mut RequestManager, rps: Vec<JsonRpcResp
/// Returns `Err(Some(msg))` if the subscription was full.
pub fn process_subscription_response(
manager: &mut RequestManager,
notif: JsonRpcNotifResponse<JsonValue>,
notif: JsonRpcSubscriptionResponse<JsonValue>,
) -> Result<(), Option<RequestMessage>> {
let sub_id = notif.params.subscription;
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Expand All @@ -72,6 +72,27 @@ pub fn process_subscription_response(
}
}

/// Attempts to process an incoming notification
///
/// Returns Ok() if the response was successfully handled
/// Returns Err() if there was no handler for the method
pub fn process_notification(manager: &mut RequestManager, notif: JsonRpcNotifResponse<JsonValue>) -> Result<(), Error> {
match manager.as_notification_handler_mut(notif.method.to_owned()) {
Some(send_back_sink) => match send_back_sink.try_send(notif.params) {
Ok(()) => Ok(()),
Err(err) => {
log::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err);
let _ = manager.remove_notification_handler(notif.method.to_owned());
Err(Error::Internal(err.into_send_error()))
}
},
None => {
log::error!("Notification: {:?} not a registered method", notif.method);
Err(Error::UnregisteredNotification(notif.method.to_owned()))
}
}
}

/// Process a response from the server.
///
/// Returns `Ok(None)` if the response was successfully sent.
Expand Down

0 comments on commit c738447

Please sign in to comment.