Skip to content

Commit

Permalink
fix: better error reporting in graphql connector (#2525)
Browse files Browse the repository at this point in the history
A customer is running into problems with `@graphql` - they get a
"received invalid response from upstream server" when they try to use
it, and there's no way to figure out what the invalid response was and
therefore how to fix it.

This PR updates the connector to print the response out to logs when it
encounters an error.

While I was there I made a few improvements to the response handling:

- If there's an HTTP error status _and_ we can't decode the response,
then we surface the HTTP status to users.
- If there's an HTTP error but the request is otherwise parseable then
we carry on as normal, with an additional error
- I've also made response deserializing a bit more strict - if there's
neither data or error that's a malformed response.

We might need to do more work around this, but I want to get this out
before the end of the day so this'll hopefully do?
  • Loading branch information
obmarg committed Jul 26, 2023
1 parent 75e30d1 commit ffd69f9
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 20 deletions.
45 changes: 25 additions & 20 deletions common/dynaql/src/registry/resolvers/graphql.rs
Expand Up @@ -25,6 +25,7 @@
#![deny(unused)]
#![deny(rustdoc::all)]

mod response;
pub mod serializer;

use std::{
Expand All @@ -48,7 +49,7 @@ use crate::{
ServerError,
};

use self::serializer::Serializer;
use self::{response::UpstreamResponse, serializer::Serializer};

use super::ResolvedValue;

Expand Down Expand Up @@ -105,7 +106,7 @@ impl Resolver {
headers: &[(&str, &str)],
fragment_definitions: HashMap<&'a Name, &'a FragmentDefinition>,
target: Target<'a>,
error_handler: impl FnMut(ServerError) + 'a,
mut error_handler: impl FnMut(ServerError) + 'a,
variables: Variables,
variable_definitions: HashMap<&'a Name, &'a VariableDefinition>,
registry: &'a Registry,
Expand Down Expand Up @@ -152,27 +153,25 @@ impl Resolver {
})
.collect();

let mut value = request_builder
let response = request_builder
.json(&Query { query, variables })
.send()
.await?
.json::<serde_json::Value>()
.await?
.take();

// Merge any upstream GraphQL errors.
if let Some(errors) = value.get_mut("errors") {
serde_json::from_value(errors.take())
.map_err(|_| Error::MalformedUpstreamResponse)
.map(|errors: Vec<ServerError>| {
errors.into_iter().for_each(error_handler);
})?;
.await?;

let http_status = response.status();

let UpstreamResponse { mut data, errors } =
UpstreamResponse::from_response_text(http_status, response.text().await)?;

if !http_status.is_success() {
// If we haven't had a fatal error we should still report the http error
error_handler(ServerError::new(
format!("Remote returned http error code: {http_status}"),
None,
));
}

let mut data = match value.get_mut("data") {
Some(value) => value.take(),
None => serde_json::Value::Null,
};
errors.into_iter().for_each(error_handler);

if let Some(prefix) = prefix {
prefix_result_typename(&mut data, &prefix);
Expand All @@ -181,7 +180,7 @@ impl Resolver {
let mut resolved_value = ResolvedValue::new(Arc::new(match wrapping_field {
Some(field) => data
.as_object_mut()
.and_then(|m| m.get_mut(&field).map(serde_json::Value::take))
.and_then(|m| m.remove(&field))
.unwrap_or(serde_json::Value::Null),
None => data,
}));
Expand All @@ -206,8 +205,14 @@ pub enum Error {
#[error("request to upstream server failed: {0}")]
RequestError(#[from] reqwest::Error),

#[error("couldnt decode JSON from upstream server: {0}")]
JsonDecodeError(#[from] serde_json::Error),

#[error("received invalid response from upstream server")]
MalformedUpstreamResponse,

#[error("received an unexpected status from the downstream server: {0}")]
HttpErrorResponse(u16),
}

/// Before the resolver returns the JSON to the caller, it needs to iterate the JSON, find any
Expand Down
176 changes: 176 additions & 0 deletions common/dynaql/src/registry/resolvers/graphql/response.rs
@@ -0,0 +1,176 @@
use http::StatusCode;
use serde_json::Value;

use crate::ServerError;

use super::Error;

#[derive(PartialEq, Debug)]
#[allow(clippy::module_name_repetitions)] // Honestly clippy, get fucked
pub struct UpstreamResponse {
pub data: serde_json::Value,
pub errors: Vec<ServerError>,
}

impl UpstreamResponse {
pub fn from_response_text(
http_status: StatusCode,
response_text_result: Result<String, impl Into<Error>>,
) -> Result<Self, Error> {
let response_text = response_text_result
.map_err(|error| handle_error_after_response(http_status, error, None))?;

serde_json::from_str::<UpstreamResponse>(&response_text)
.map_err(|error| handle_error_after_response(http_status, error, Some(response_text)))
}
}

impl<'de> serde::Deserialize<'de> for UpstreamResponse {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;

#[derive(serde::Deserialize)]
struct ResponseDeser {
/// The operation data (if the operation was successful)
data: Option<Value>,

/// Any errors that occurred as part of this operation
errors: Option<Vec<ServerError>>,
}

let ResponseDeser { data, errors } = ResponseDeser::deserialize(deserializer)?;

if data.is_none() && errors.is_none() {
return Err(D::Error::custom(
"neither data or errors were present in the upstream response",
));
}

Ok(UpstreamResponse {
data: data.unwrap_or(Value::Null),
errors: errors.unwrap_or_default(),
})
}
}

/// If we encountered an error handling a GraphQL response then we want to log the
/// body for debugging purposes. But we also want to check the HTTP status code
/// and use that as our primary error if it's not success.
fn handle_error_after_response(
status: StatusCode,
error: impl Into<Error>,
#[allow(unused)] response_body: Option<String>,
) -> Error {
let error = error.into();
#[cfg(feature = "tracing_worker")]
{
logworker::debug!("", "Error in GraphQL connector: {error}");
if let Some(text) = response_body {
logworker::debug!("", "Response Body: {text}");
}
}

if !status.is_success() {
return Error::HttpErrorResponse(status.as_u16());
}

error
}

#[cfg(test)]
mod tests {
use rstest::rstest;
use serde_json::json;

use super::*;

#[rstest]
#[case(200, r#"{"data": {}}"#, UpstreamResponse {
data: json!({}),
errors: vec![]
})]
#[case(200, r#"{"errors": []}"#, UpstreamResponse {
data: json!(null),
errors: vec![]
})]
#[case(200, r#"
{"errors": [{
"message": "oh no"
}]}
"#, UpstreamResponse {
data: json!(null),
errors: vec![ServerError {
message: "oh no".into(),
source: None,
locations: vec![],
path: vec![],
extensions: None
}]
})]
#[case(500, r#"
{"errors": [{
"message": "oh no"
}]}
"#, UpstreamResponse {
data: json!(null),
errors: vec![ServerError {
message: "oh no".into(),
source: None,
locations: vec![],
path: vec![],
extensions: None
}]
})]
#[case(200, r#"{"data": {}}"#, UpstreamResponse {
data: json!({}),
errors: vec![]
})]
fn test_happy_paths(
#[case] status_code: u16,
#[case] text: &str,
#[case] expected_response: UpstreamResponse,
) {
assert_eq!(
UpstreamResponse::from_response_text(
StatusCode::from_u16(status_code).unwrap(),
Ok::<_, Error>(text.to_string())
)
.unwrap(),
expected_response
);
}

#[test]
fn test_error_paths() {
let response = UpstreamResponse::from_response_text(
StatusCode::from_u16(500).unwrap(),
Ok::<_, Error>(r#"{"blah": {}}"#.into()),
)
.unwrap_err();
assert!(
matches!(response, Error::HttpErrorResponse(500)),
"`{response}` does not match"
);
let response = UpstreamResponse::from_response_text(
StatusCode::from_u16(200).unwrap(),
Ok::<_, Error>(r#"{"blah": {}}"#.into()),
)
.unwrap_err();
assert!(
matches!(response, Error::JsonDecodeError(_)),
"{response} does not match"
);
let response = UpstreamResponse::from_response_text(
StatusCode::from_u16(200).unwrap(),
Ok::<_, Error>(r#"{"errors": "bad"}"#.into()),
)
.unwrap_err();
assert!(
matches!(response, Error::JsonDecodeError(_)),
"{response} does not match"
);
}
}

0 comments on commit ffd69f9

Please sign in to comment.