Skip to content

Commit

Permalink
Report response parse errors to avoid freezes
Browse files Browse the repository at this point in the history
  • Loading branch information
marmistrz committed May 31, 2022
1 parent 0a32ab5 commit c6d812d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 39 deletions.
14 changes: 13 additions & 1 deletion src/helpers.rs
Expand Up @@ -6,7 +6,7 @@ use futures::{
Future,
};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Deserialize};
use std::{marker::PhantomData, pin::Pin};

/// Takes any type which is deserializable from rpc::Value and such a value and
Expand Down Expand Up @@ -87,6 +87,18 @@ where
}
}

/// Extract the response id from slice. Used to obtain the response id if the deserialization of the whole response fails,
/// workraround for https://github.com/tomusdrw/rust-web3/issues/566
pub fn response_id_from_slice(response: &[u8]) -> Option<rpc::Id> {
#[derive(Deserialize)]
struct JustId {
id: rpc::Id,
}

let value: JustId = serde_json::from_slice(response).ok()?;
Some(value.id)
}

/// Parse bytes slice into JSON-RPC notification.
pub fn to_notification_from_slice(notification: &[u8]) -> error::Result<rpc::Notification> {
serde_json::from_slice(notification).map_err(|e| error::Error::InvalidResponse(format!("{:?}", e)))
Expand Down
114 changes: 76 additions & 38 deletions src/transports/ws.rs
Expand Up @@ -257,56 +257,94 @@ fn as_data_stream<T: Unpin + futures::AsyncRead + futures::AsyncWrite>(
})
}

enum ParsedMessage {
/// Represents a JSON-RPC notification
Notification(rpc::Notification),
/// Represents a valid JSON-RPC response
Response(rpc::Response),
/// Represents an invalid JSON-RPC response
InvalidResponse(rpc::Id),
}

fn parse_message(data: &[u8]) -> Option<ParsedMessage> {
if let Ok(notification) = helpers::to_notification_from_slice(data) {
Some(ParsedMessage::Notification(notification))
} else if let Ok(response) = helpers::to_response_from_slice(data) {
Some(ParsedMessage::Response(response))
} else if let Some(id) = helpers::response_id_from_slice(data) {
Some(ParsedMessage::InvalidResponse(id))
} else {
None
}
}

fn respond(id: rpc::Id, outputs: Result<Vec<rpc::Output>, Error>, pending: &mut BTreeMap<RequestId, Pending>) {
if let rpc::Id::Num(num) = id {
if let Some(request) = pending.remove(&(num as usize)) {
log::trace!("Responding to (id: {:?}) with {:?}", num, outputs);
let response = outputs.and_then(helpers::to_results_from_outputs);
if let Err(err) = request.send(response) {
log::warn!("Sending a response to deallocated channel: {:?}", err);
}
} else {
log::warn!("Got response for unknown request (id: {:?})", num);
}
} else {
log::warn!("Got unsupported response (id: {:?})", id);
}
}

fn handle_message(
data: &[u8],
subscriptions: &BTreeMap<SubscriptionId, Subscription>,
pending: &mut BTreeMap<RequestId, Pending>,
) {
log::trace!("Message received: {:?}", data);
if let Ok(notification) = helpers::to_notification_from_slice(data) {
if let rpc::Params::Map(params) = notification.params {
let id = params.get("subscription");
let result = params.get("result");

if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
let id: SubscriptionId = id.clone().into();
if let Some(stream) = subscriptions.get(&id) {
if let Err(e) = stream.unbounded_send(result.clone()) {
log::error!("Error sending notification: {:?} (id: {:?}", e, id);
log::trace!("Message received: {:?}", String::from_utf8_lossy(data));
match parse_message(data) {
Some(ParsedMessage::Notification(notification)) => {
if let rpc::Params::Map(params) = notification.params {
let id = params.get("subscription");
let result = params.get("result");
log::debug!("params={:#?}", params);

if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
let id: SubscriptionId = id.clone().into();
log::debug!("subscriptions={:#?}", subscriptions);

if let Some(stream) = subscriptions.get(&id) {
if let Err(e) = stream.unbounded_send(result.clone()) {
log::error!("Error sending notification: {:?} (id: {:?}", e, id);
}
} else {
log::warn!("Got notification for unknown subscription (id: {:?})", id);
}
} else {
log::warn!("Got notification for unknown subscription (id: {:?})", id);
log::error!("Got unsupported notification (id: {:?})", id);
}
} else {
log::error!("Got unsupported notification (id: {:?})", id);
}
}
} else {
let response = helpers::to_response_from_slice(data);
let outputs = match response {
Ok(rpc::Response::Single(output)) => vec![output],
Ok(rpc::Response::Batch(outputs)) => outputs,
_ => vec![],
};
Some(ParsedMessage::Response(response)) => {
let outputs = match response {
rpc::Response::Single(output) => vec![output],
rpc::Response::Batch(outputs) => outputs,
};

let id = match outputs.get(0) {
Some(&rpc::Output::Success(ref success)) => success.id.clone(),
Some(&rpc::Output::Failure(ref failure)) => failure.id.clone(),
None => rpc::Id::Num(0),
};
let id = match outputs.get(0) {
Some(&rpc::Output::Success(ref success)) => success.id.clone(),
Some(&rpc::Output::Failure(ref failure)) => failure.id.clone(),
None => rpc::Id::Num(0), // to handle empty batches
};

if let rpc::Id::Num(num) = id {
if let Some(request) = pending.remove(&(num as usize)) {
log::trace!("Responding to (id: {:?}) with {:?}", num, outputs);
if let Err(err) = request.send(helpers::to_results_from_outputs(outputs)) {
log::warn!("Sending a response to deallocated channel: {:?}", err);
}
} else {
log::warn!("Got response for unknown request (id: {:?})", num);
}
} else {
log::warn!("Got unsupported response (id: {:?})", id);
respond(id, Ok(outputs), pending);
}
Some(ParsedMessage::InvalidResponse(id)) => {
let error = Error::Decoder(String::from_utf8_lossy(data).to_string());
respond(id, Err(error), pending);
}
None => log::warn!(
"Got invalid response, which could not be parsed: {}",
String::from_utf8_lossy(data)
),
}
}

Expand Down Expand Up @@ -360,7 +398,7 @@ impl WebSocket {

fn send_request(&self, id: RequestId, request: rpc::Request) -> error::Result<oneshot::Receiver<BatchResult>> {
let request = helpers::to_string(&request);
log::debug!("[{}] Calling: {}", id, request);
log::debug!("[{}] Calling: {}, {:?}", id, request, request);
let (sender, receiver) = oneshot::channel();
self.send(TransportMessage::Request { id, request, sender })?;
Ok(receiver)
Expand Down

0 comments on commit c6d812d

Please sign in to comment.