Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report response parse errors to avoid freezes #639

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/helpers.rs
Expand Up @@ -6,7 +6,7 @@ use futures::{
Future,
};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Deserialize};
use std::{marker::PhantomData, pin::Pin};

/// Takes any type which is deserializable from rpc::Value and such a value and
Expand Down Expand Up @@ -87,6 +87,18 @@ where
}
}

/// Extract the response id from bytes slice. Used to obtain the response id if the deserialization of the whole response fails,
/// workaround for https://github.com/tomusdrw/rust-web3/issues/566
pub fn response_id_from_slice(response: &[u8]) -> Option<rpc::Id> {
#[derive(Deserialize)]
struct JustId {
id: rpc::Id,
}

let value: JustId = serde_json::from_slice(response).ok()?;
Some(value.id)
}

/// Parse bytes slice into JSON-RPC notification.
pub fn to_notification_from_slice(notification: &[u8]) -> error::Result<rpc::Notification> {
serde_json::from_slice(notification).map_err(|e| error::Error::InvalidResponse(format!("{:?}", e)))
Expand Down
114 changes: 76 additions & 38 deletions src/transports/ws.rs
Expand Up @@ -257,56 +257,94 @@ fn as_data_stream<T: Unpin + futures::AsyncRead + futures::AsyncWrite>(
})
}

enum ParsedMessage {
/// Represents a JSON-RPC notification
Notification(rpc::Notification),
/// Represents a valid JSON-RPC response
Response(rpc::Response),
/// Represents an malformed JSON-RPC response, although containing the request `id`
InvalidResponse(rpc::Id),
}

fn parse_message(data: &[u8]) -> Option<ParsedMessage> {
if let Ok(notification) = helpers::to_notification_from_slice(data) {
Some(ParsedMessage::Notification(notification))
} else if let Ok(response) = helpers::to_response_from_slice(data) {
Some(ParsedMessage::Response(response))
} else if let Some(id) = helpers::response_id_from_slice(data) {
Some(ParsedMessage::InvalidResponse(id))
} else {
None
}
}
Comment on lines +269 to +279
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it might be better to simply use untagged enum ParsedMessage with default serde deserialize behaviour? Given some tests are written I think it should be simpler to reason about and possibly also faster than manually attempting to parse the response couple of times.


fn respond(id: rpc::Id, outputs: Result<Vec<rpc::Output>, Error>, pending: &mut BTreeMap<RequestId, Pending>) {
if let rpc::Id::Num(num) = id {
if let Some(request) = pending.remove(&(num as usize)) {
log::trace!("Responding to (id: {:?}) with {:?}", num, outputs);
let response = outputs.and_then(helpers::to_results_from_outputs);
if let Err(err) = request.send(response) {
log::warn!("Sending a response to deallocated channel: {:?}", err);
}
} else {
log::warn!("Got response for unknown request (id: {:?})", num);
}
} else {
log::warn!("Got unsupported response (id: {:?})", id);
}
}

fn handle_message(
data: &[u8],
subscriptions: &BTreeMap<SubscriptionId, Subscription>,
pending: &mut BTreeMap<RequestId, Pending>,
) {
log::trace!("Message received: {:?}", data);
if let Ok(notification) = helpers::to_notification_from_slice(data) {
if let rpc::Params::Map(params) = notification.params {
let id = params.get("subscription");
let result = params.get("result");

if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
let id: SubscriptionId = id.clone().into();
if let Some(stream) = subscriptions.get(&id) {
if let Err(e) = stream.unbounded_send(result.clone()) {
log::error!("Error sending notification: {:?} (id: {:?}", e, id);
log::trace!("Message received: {:?}", String::from_utf8_lossy(data));
match parse_message(data) {
Some(ParsedMessage::Notification(notification)) => {
if let rpc::Params::Map(params) = notification.params {
let id = params.get("subscription");
let result = params.get("result");
log::debug!("params={:#?}", params);

if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
let id: SubscriptionId = id.clone().into();
log::debug!("subscriptions={:#?}", subscriptions);

if let Some(stream) = subscriptions.get(&id) {
if let Err(e) = stream.unbounded_send(result.clone()) {
log::error!("Error sending notification: {:?} (id: {:?}", e, id);
}
} else {
log::warn!("Got notification for unknown subscription (id: {:?})", id);
}
} else {
log::warn!("Got notification for unknown subscription (id: {:?})", id);
log::error!("Got unsupported notification (id: {:?})", id);
}
} else {
log::error!("Got unsupported notification (id: {:?})", id);
}
}
} else {
let response = helpers::to_response_from_slice(data);
let outputs = match response {
Ok(rpc::Response::Single(output)) => vec![output],
Ok(rpc::Response::Batch(outputs)) => outputs,
_ => vec![],
};
Some(ParsedMessage::Response(response)) => {
let outputs = match response {
rpc::Response::Single(output) => vec![output],
rpc::Response::Batch(outputs) => outputs,
};

let id = match outputs.get(0) {
Some(&rpc::Output::Success(ref success)) => success.id.clone(),
Some(&rpc::Output::Failure(ref failure)) => failure.id.clone(),
None => rpc::Id::Num(0),
};
let id = match outputs.get(0) {
Some(&rpc::Output::Success(ref success)) => success.id.clone(),
Some(&rpc::Output::Failure(ref failure)) => failure.id.clone(),
None => rpc::Id::Num(0), // to handle empty batches
};

if let rpc::Id::Num(num) = id {
if let Some(request) = pending.remove(&(num as usize)) {
log::trace!("Responding to (id: {:?}) with {:?}", num, outputs);
if let Err(err) = request.send(helpers::to_results_from_outputs(outputs)) {
log::warn!("Sending a response to deallocated channel: {:?}", err);
}
} else {
log::warn!("Got response for unknown request (id: {:?})", num);
}
} else {
log::warn!("Got unsupported response (id: {:?})", id);
respond(id, Ok(outputs), pending);
}
Some(ParsedMessage::InvalidResponse(id)) => {
let error = Error::Decoder(String::from_utf8_lossy(data).to_string());
respond(id, Err(error), pending);
}
None => log::warn!(
"Got invalid response, which could not be parsed: {}",
String::from_utf8_lossy(data)
),
}
}

Expand Down Expand Up @@ -360,7 +398,7 @@ impl WebSocket {

fn send_request(&self, id: RequestId, request: rpc::Request) -> error::Result<oneshot::Receiver<BatchResult>> {
let request = helpers::to_string(&request);
log::debug!("[{}] Calling: {}", id, request);
log::debug!("[{}] Calling: {}, {:?}", id, request, request);
let (sender, receiver) = oneshot::channel();
self.send(TransportMessage::Request { id, request, sender })?;
Ok(receiver)
Expand Down