Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variable deduplication in query planner #872

Merged
merged 17 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ Description! And a link to a [reference](http://url)

Rhai plugins can now interact with Context::upsert(). We provide an example (rhai-surrogate-cache-key) to illustrate its use.

### Add an experimental optimization to deduplicate variables in query planner [PR #872](https://github.com/apollographql/router/pull/872)
Get rid of duplicated variables in requests and responses of the query planner. This optimization is disabled by default, if you want to enable it you just need override your configuration:

```yaml title="router.yaml"
plugins:
experimental.traffic_shaping:
variables_deduplication: true # Enable the variables deduplication optimization
```

### Measure APQ cache hits and registers ([PR #1117](https://github.com/apollographql/router/pull/1117))

The APQ layer will now report cache hits and misses to Apollo Studio if telemetry is configured
Expand All @@ -50,8 +59,7 @@ Description! And a link to a [reference](http://url)
### Content-Type is application/json ([1154](https://github.com/apollographql/router/issues/1154))
The router was not setting a content-type on results. This fix ensures that a content-type of application/json is added when returning a graphql response.

- **Prevent memory leaks when tasks are cancelled** [PR #767](https://github.com/apollographql/router/pull/767)

### Prevent memory leaks when tasks are cancelled [PR #767](https://github.com/apollographql/router/pull/767)
Cancelling a request could put the router in an unresponsive state where the deduplication layer or cache would make subgraph requests hang.

## 🛠 Maintenance
Expand Down
4 changes: 2 additions & 2 deletions apollo-router-core/src/json_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ where
/// A GraphQL path element that is composes of strings or numbers.
/// e.g `/book/3/name`
#[doc(hidden)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
#[serde(untagged)]
pub enum PathElement {
/// A path element that given an array will flatmap the content.
Expand Down Expand Up @@ -424,7 +424,7 @@ where
///
/// This can be composed of strings and numbers
#[doc(hidden)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default, Hash)]
#[serde(transparent)]
pub struct Path(pub Vec<PathElement>);

Expand Down
1 change: 0 additions & 1 deletion apollo-router-core/src/plugins/include_subgraph_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ mod test {
);

let builder = PluggableRouterServiceBuilder::new(schema.clone());

let builder = builder
.with_dyn_plugin("experimental.include_subgraph_errors".to_string(), plugin)
.with_subgraph_service("accounts", account_service.clone())
Expand Down
152 changes: 146 additions & 6 deletions apollo-router-core/src/plugins/traffic_shaping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ use tower::{BoxError, ServiceBuilder, ServiceExt};

use crate::plugin::Plugin;
use crate::plugins::traffic_shaping::deduplication::QueryDeduplicationLayer;
use crate::{register_plugin, ServiceBuilderExt, SubgraphRequest, SubgraphResponse};
use crate::{
register_plugin, QueryPlannerRequest, QueryPlannerResponse, ServiceBuilderExt, SubgraphRequest,
SubgraphResponse,
};

#[derive(PartialEq, Debug, Clone, Deserialize, JsonSchema)]
struct Shaping {
dedup: Option<bool>,
/// Enable query deduplication
query_deduplication: Option<bool>,
}

impl Shaping {
fn merge(&self, fallback: Option<&Shaping>) -> Shaping {
match fallback {
None => self.clone(),
Some(fallback) => Shaping {
dedup: self.dedup.or(fallback.dedup),
query_deduplication: self.query_deduplication.or(fallback.query_deduplication),
},
}
}
Expand All @@ -44,6 +48,8 @@ struct Config {
all: Option<Shaping>,
#[serde(default)]
subgraphs: HashMap<String, Shaping>,
/// Enable variable deduplication optimization when sending requests to subgraphs (https://github.com/apollographql/router/issues/87)
variables_deduplication: Option<bool>,
}

struct TrafficShaping {
Expand All @@ -70,7 +76,7 @@ impl Plugin for TrafficShaping {

if let Some(config) = final_config {
ServiceBuilder::new()
.option_layer(config.dedup.unwrap_or_default().then(|| {
.option_layer(config.query_deduplication.unwrap_or_default().then(|| {
//Buffer is required because dedup layer requires a clone service.
ServiceBuilder::new()
.layer(QueryDeduplicationLayer::default())
Expand All @@ -82,6 +88,22 @@ impl Plugin for TrafficShaping {
service
}
}

fn query_planning_service(
&mut self,
service: BoxService<QueryPlannerRequest, QueryPlannerResponse, BoxError>,
) -> BoxService<QueryPlannerRequest, QueryPlannerResponse, BoxError> {
if matches!(self.config.variables_deduplication, Some(true)) {
service
.map_request(|mut req: QueryPlannerRequest| {
req.query_plan_options.enable_variable_deduplication = true;
req
})
.boxed()
} else {
service
}
}
}

impl TrafficShaping {
Expand All @@ -98,17 +120,135 @@ register_plugin!("experimental", "traffic_shaping", TrafficShaping);

#[cfg(test)]
mod test {
use std::sync::Arc;

use once_cell::sync::Lazy;
use serde_json_bytes::{ByteString, Value};
use tower::{util::BoxCloneService, Service};

use crate::{
utils::test::mock::subgraph::MockSubgraph, DynPlugin, Object,
PluggableRouterServiceBuilder, ResponseBody, RouterRequest, RouterResponse, Schema,
};

use super::*;

static EXPECTED_RESPONSE: Lazy<ResponseBody> = Lazy::new(|| {
ResponseBody::GraphQL(serde_json::from_str(r#"{"data":{"topProducts":[{"upc":"1","name":"Table","reviews":[{"id":"1","product":{"name":"Table"},"author":{"id":"1","name":"Ada Lovelace"}},{"id":"4","product":{"name":"Table"},"author":{"id":"2","name":"Alan Turing"}}]},{"upc":"2","name":"Couch","reviews":[{"id":"2","product":{"name":"Couch"},"author":{"id":"1","name":"Ada Lovelace"}}]}]}}"#).unwrap())
});

static VALID_QUERY: &str = r#"query TopProducts($first: Int) { topProducts(first: $first) { upc name reviews { id product { name } author { id name } } } }"#;

async fn execute_router_test(
query: &str,
body: &ResponseBody,
mut router_service: BoxCloneService<RouterRequest, RouterResponse, BoxError>,
) {
let request = RouterRequest::fake_builder()
.query(query.to_string())
.variable("first", 2usize)
.build()
.expect("expecting valid request");

let response = router_service
.ready()
.await
.unwrap()
.call(request)
.await
.unwrap();
assert_eq!(response.response.body(), body);
}

async fn build_mock_router_with_variable_dedup_optimization(
plugin: Box<dyn DynPlugin>,
) -> BoxCloneService<RouterRequest, RouterResponse, BoxError> {
let mut extensions = Object::new();
extensions.insert("test", Value::String(ByteString::from("value")));

let account_mocks = vec![
(
r#"{"query":"query TopProducts__accounts__3($representations:[_Any!]!){_entities(representations:$representations){...on User{name}}}","operationName":"TopProducts__accounts__3","variables":{"representations":[{"__typename":"User","id":"1"},{"__typename":"User","id":"2"}]}}"#,
r#"{"data":{"_entities":[{"name":"Ada Lovelace"},{"name":"Alan Turing"}]}}"#
)
].into_iter().map(|(query, response)| (serde_json::from_str(query).unwrap(), serde_json::from_str(response).unwrap())).collect();
let account_service = MockSubgraph::new(account_mocks);

let review_mocks = vec![
(
r#"{"query":"query TopProducts__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on Product{reviews{id product{__typename upc}author{__typename id}}}}}","operationName":"TopProducts__reviews__1","variables":{"representations":[{"__typename":"Product","upc":"1"},{"__typename":"Product","upc":"2"}]}}"#,
r#"{"data":{"_entities":[{"reviews":[{"id":"1","product":{"__typename":"Product","upc":"1"},"author":{"__typename":"User","id":"1"}},{"id":"4","product":{"__typename":"Product","upc":"1"},"author":{"__typename":"User","id":"2"}}]},{"reviews":[{"id":"2","product":{"__typename":"Product","upc":"2"},"author":{"__typename":"User","id":"1"}}]}]}}"#
)
].into_iter().map(|(query, response)| (serde_json::from_str(query).unwrap(), serde_json::from_str(response).unwrap())).collect();
let review_service = MockSubgraph::new(review_mocks);

let product_mocks = vec![
(
r#"{"query":"query TopProducts__products__0($first:Int){topProducts(first:$first){__typename upc name}}","operationName":"TopProducts__products__0","variables":{"first":2}}"#,
r#"{"data":{"topProducts":[{"__typename":"Product","upc":"1","name":"Table"},{"__typename":"Product","upc":"2","name":"Couch"}]}}"#
),
(
r#"{"query":"query TopProducts__products__2($representations:[_Any!]!){_entities(representations:$representations){...on Product{name}}}","operationName":"TopProducts__products__2","variables":{"representations":[{"__typename":"Product","upc":"1"},{"__typename":"Product","upc":"2"}]}}"#,
r#"{"data":{"_entities":[{"name":"Table"},{"name":"Couch"}]}}"#
)
].into_iter().map(|(query, response)| (serde_json::from_str(query).unwrap(), serde_json::from_str(response).unwrap())).collect();

let product_service = MockSubgraph::new(product_mocks).with_extensions(extensions);

let schema: Arc<Schema> = Arc::new(
include_str!(
"../../../../apollo-router-benchmarks/benches/fixtures/supergraph.graphql"
)
.parse()
.unwrap(),
);

let builder = PluggableRouterServiceBuilder::new(schema.clone());

let builder = builder
.with_dyn_plugin("experimental.traffic_shaping".to_string(), plugin)
.with_subgraph_service("accounts", account_service.clone())
.with_subgraph_service("reviews", review_service.clone())
.with_subgraph_service("products", product_service.clone());

let (router, _) = builder.build().await.expect("should build");

router
}

async fn get_taffic_shaping_plugin(config: &serde_json::Value) -> Box<dyn DynPlugin> {
// Build a redacting plugin
crate::plugins()
.get("experimental.traffic_shaping")
.expect("Plugin not found")
.create_instance(config)
.await
.expect("Plugin not created")
}

#[tokio::test]
async fn it_returns_valid_response_for_deduplicated_variables() {
let config = serde_yaml::from_str::<serde_json::Value>(
r#"
variables_deduplication: true
"#,
)
.unwrap();
// Build a redacting plugin
let plugin = get_taffic_shaping_plugin(&config).await;
let router = build_mock_router_with_variable_dedup_optimization(plugin).await;
execute_router_test(VALID_QUERY, &*EXPECTED_RESPONSE, router).await;
}

#[test]
fn test_merge_config() {
let config = serde_yaml::from_str::<Config>(
r#"
all:
dedup: true
query_deduplication: true
subgraphs:
products:
dedup: false
query_deduplication: false
"#,
)
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions apollo-router-core/src/query_planner/bridge_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Service<QueryPlannerRequest> for BridgeQueryPlanner {
"presence of a query has been checked by the RouterService before; qed",
),
body.operation_name.to_owned(),
Default::default(),
req.query_plan_options,
)
.await
{
Expand All @@ -73,7 +73,7 @@ impl QueryPlanner for BridgeQueryPlanner {
&self,
query: String,
operation: Option<String>,
_options: QueryPlanOptions,
options: QueryPlanOptions,
) -> Result<Arc<query_planner::QueryPlan>, QueryPlannerError> {
let planner_result = self
.planner
Expand All @@ -90,6 +90,7 @@ impl QueryPlanner for BridgeQueryPlanner {
} => Ok(Arc::new(query_planner::QueryPlan {
usage_reporting,
root: node,
options,
})),
PlanSuccess {
data: QueryPlan { node: None },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
.clone()
.expect("presence of a query has been checked by the RouterService before; qed"),
body.operation_name.to_owned(),
QueryPlanOptions::default(),
request.query_plan_options,
);
let cm = self.cm.clone();
Box::pin(async move {
Expand Down