Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to disable batch requests support #744

Merged
merged 1 commit into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 27 additions & 2 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, Met
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::ErrorCode;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use tokio::net::{TcpListener, ToSocketAddrs};
Expand All @@ -57,6 +57,7 @@ pub struct Builder<M = ()> {
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
Expand All @@ -67,6 +68,7 @@ impl Default for Builder {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
max_response_body_size: TEN_MB_SIZE_BYTES,
batch_requests_supported: true,
resources: Resources::default(),
access_control: AccessControl::default(),
tokio_runtime: None,
Expand Down Expand Up @@ -112,6 +114,7 @@ impl<M> Builder<M> {
Builder {
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
access_control: self.access_control,
tokio_runtime: self.tokio_runtime,
Expand All @@ -137,6 +140,13 @@ impl<M> Builder<M> {
self
}

/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.batch_requests_supported = supported;
self
}

/// Register a new resource kind. Errors if `label` is already registered, or if the number of
/// registered resources on this server instance would exceed 8.
///
Expand Down Expand Up @@ -199,6 +209,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
Expand Down Expand Up @@ -241,6 +252,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
Expand Down Expand Up @@ -274,6 +286,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
Expand Down Expand Up @@ -323,6 +336,8 @@ pub struct Server<M = ()> {
max_request_body_size: u32,
/// Max response body size.
max_response_body_size: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Access control
access_control: AccessControl,
/// Tracker for currently used resources on the server
Expand All @@ -347,6 +362,7 @@ impl<M: Middleware> Server<M> {
let listener = self.listener;
let resources = self.resources;
let middleware = self.middleware;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;

let make_service = make_service_fn(move |_| {
Expand Down Expand Up @@ -405,6 +421,7 @@ impl<M: Middleware> Server<M> {
resources,
max_request_body_size,
max_response_body_size,
batch_requests_supported,
)
.await?;

Expand Down Expand Up @@ -494,6 +511,7 @@ async fn process_validated_request(
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let (parts, body) = request.into_parts();

Expand Down Expand Up @@ -570,7 +588,14 @@ async fn process_validated_request(
}
// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
if !batch_requests_supported {
// Server was configured to not support batches.
is_single = true;
sink.send_error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
} else if !batch.is_empty() {
let middleware = &middleware;

join_all(batch.into_iter().filter_map(move |req| {
Expand Down
22 changes: 22 additions & 0 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,25 @@ async fn can_set_the_max_response_size() {

handle.stop().unwrap();
}

#[tokio::test]
async fn disabled_batches() {
let addr = "127.0.0.1:0";
// Disable batches support.
let server = HttpServerBuilder::default().batch_requests_supported(false).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| Ok("ok")).unwrap();
let addr = server.local_addr().unwrap();
let uri = to_http_uri(addr);
let handle = server.start(module).unwrap();

// Send a valid batch.
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.body, batches_not_supported());

handle.stop().unwrap();
}
4 changes: 4 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub fn oversized_request() -> String {
r#"{"jsonrpc":"2.0","error":{"code":-32701,"message":"Request is too big"},"id":null}"#.into()
}

pub fn batches_not_supported() -> String {
r#"{"jsonrpc":"2.0","error":{"code":-32005,"message":"Batched requests are not supported by this server"},"id":null}"#.into()
}

pub fn oversized_response(id: Id, max_limit: u32) -> String {
format!(
r#"{{"jsonrpc":"2.0","error":{{"code":-32702,"message":"Response is too big","data":"Exceeded max limit {}"}},"id":{}}}"#,
Expand Down
4 changes: 4 additions & 0 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub const UNKNOWN_ERROR_CODE: i32 = -32001;
pub const SUBSCRIPTION_CLOSED: i32 = -32003;
/// Subscription got closed by the server.
pub const SUBSCRIPTION_CLOSED_WITH_ERROR: i32 = -32004;
/// Batched requests are not supported by the server.
pub const BATCHES_NOT_SUPPORTED_CODE: i32 = -32005;

/// Parse error message
pub const PARSE_ERROR_MSG: &str = "Parse error";
Expand All @@ -199,6 +201,8 @@ pub const METHOD_NOT_FOUND_MSG: &str = "Method not found";
pub const SERVER_IS_BUSY_MSG: &str = "Server is busy, try again later";
/// Reserved for implementation-defined server-errors.
pub const SERVER_ERROR_MSG: &str = "Server error";
/// Batched requests not supported error message.
pub const BATCHES_NOT_SUPPORTED_MSG: &str = "Batched requests are not supported by this server";

/// JSONRPC error code
#[derive(Error, Debug, PartialEq, Copy, Clone)]
Expand Down
22 changes: 20 additions & 2 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::ErrorCode;
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::mpsc;
use futures_util::future::{join_all, FutureExt};
Expand Down Expand Up @@ -270,6 +270,7 @@ where
resources.clone(),
cfg.max_request_body_size,
cfg.max_response_body_size,
cfg.batch_requests_supported,
BoundedSubscriptions::new(cfg.max_subscriptions_per_connection),
stop_monitor.clone(),
middleware,
Expand All @@ -292,6 +293,7 @@ async fn background_task(
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
bounded_subscriptions: BoundedSubscriptions,
stop_server: StopMonitor,
middleware: impl Middleware,
Expand Down Expand Up @@ -490,7 +492,13 @@ async fn background_task(
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&d) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
if !batch.is_empty() {
if !batch_requests_supported {
sink.send_error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
middleware.on_response(request_start);
} else if !batch.is_empty() {
join_all(batch.into_iter().filter_map(move |req| {
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
Expand Down Expand Up @@ -656,6 +664,8 @@ struct Settings {
allowed_origins: AllowedValue,
/// Policy by which to accept or deny incoming requests based on the `Host` header.
allowed_hosts: AllowedValue,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}
Expand All @@ -667,6 +677,7 @@ impl Default for Settings {
max_response_body_size: TEN_MB_SIZE_BYTES,
max_subscriptions_per_connection: 1024,
max_connections: MAX_CONNECTIONS,
batch_requests_supported: true,
allowed_origins: AllowedValue::Any,
allowed_hosts: AllowedValue::Any,
tokio_runtime: None,
Expand Down Expand Up @@ -720,6 +731,13 @@ impl<M> Builder<M> {
self
}

/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.settings.batch_requests_supported = supported;
self
}

/// Set the maximum number of connections allowed. Default is 1024.
pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
self.settings.max_subscriptions_per_connection = max;
Expand Down
29 changes: 29 additions & 0 deletions ws-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,32 @@ async fn custom_subscription_id_works() {
let unsub = client.send_request_text(call("unsubscribe_hello", vec!["0xdeadbeef"], Id::Num(1))).await.unwrap();
assert_eq!(&unsub, r#"{"jsonrpc":"2.0","result":true,"id":1}"#);
}

#[tokio::test]
async fn disabled_batches() {
// Disable batches support.
let server = WsServerBuilder::default()
.batch_requests_supported(false)
.build("127.0.0.1:0")
.with_default_timeout()
.await
.unwrap()
.unwrap();

let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| Ok("ok")).unwrap();
let addr = server.local_addr().unwrap();

let handle = server.start(module).unwrap();

// Send a valid batch.
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, batches_not_supported());

handle.stop().unwrap();
}