From a585a722936f018da50b8c306f53af80b189a3bf Mon Sep 17 00:00:00 2001 From: Marcus Griep Date: Mon, 13 Jun 2022 17:48:27 -0400 Subject: [PATCH] feat!: remove codegen dependency on compression feature (#1004) --- examples/Cargo.toml | 4 +- examples/src/compression/client.rs | 5 +- examples/src/compression/server.rs | 5 +- tests/compression/Cargo.toml | 4 +- tests/compression/src/bidirectional_stream.rs | 9 +- tests/compression/src/client_stream.rs | 19 ++- tests/compression/src/compressing_request.rs | 15 ++- tests/compression/src/compressing_response.rs | 34 +++-- tests/compression/src/server_stream.rs | 13 +- tonic-build/Cargo.toml | 1 - tonic-build/src/client.rs | 12 +- tonic-build/src/server.rs | 40 +++--- tonic/Cargo.toml | 2 +- tonic/src/client/grpc.rs | 100 +++++---------- tonic/src/codec/compression.rs | 73 ++++++++--- tonic/src/codec/decode.rs | 118 ++++++----------- tonic/src/codec/encode.rs | 71 ++++------- tonic/src/codec/mod.rs | 3 - tonic/src/codegen.rs | 1 - tonic/src/response.rs | 4 +- tonic/src/server/grpc.rs | 119 ++++-------------- 21 files changed, 264 insertions(+), 388 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 35911625c..95e7aef7a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -200,7 +200,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } prost = "0.10" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { path = "../tonic", features = ["tls", "compression"] } +tonic = { path = "../tonic", features = ["tls", "gzip"] } tower = { version = "0.4" } # Required for routeguide rand = "0.8" @@ -237,4 +237,4 @@ tower-http = { version = "0.3", features = ["add-extension", "util"] } [build-dependencies] -tonic-build = { path = "../tonic-build", features = ["prost", "compression"] } +tonic-build = { path = "../tonic-build", features = ["prost"] } diff --git a/examples/src/compression/client.rs b/examples/src/compression/client.rs index 77ffeebe9..da4f7579f 100644 --- a/examples/src/compression/client.rs +++ b/examples/src/compression/client.rs @@ -1,5 +1,6 @@ use hello_world::greeter_client::GreeterClient; use hello_world::HelloRequest; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; pub mod hello_world { @@ -13,7 +14,9 @@ async fn main() -> Result<(), Box> { .await .unwrap(); - let mut client = GreeterClient::new(channel).send_gzip().accept_gzip(); + let mut client = GreeterClient::new(channel) + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); let request = tonic::Request::new(HelloRequest { name: "Tonic".into(), diff --git a/examples/src/compression/server.rs b/examples/src/compression/server.rs index 36f5081fd..fd035d218 100644 --- a/examples/src/compression/server.rs +++ b/examples/src/compression/server.rs @@ -2,6 +2,7 @@ use tonic::{transport::Server, Request, Response, Status}; use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; +use tonic::codec::CompressionEncoding; pub mod hello_world { tonic::include_proto!("helloworld"); @@ -32,7 +33,9 @@ async fn main() -> Result<(), Box> { println!("GreeterServer listening on {}", addr); - let service = GreeterServer::new(greeter).send_gzip().accept_gzip(); + let service = GreeterServer::new(greeter) + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); Server::builder().add_service(service).serve(addr).await?; diff --git a/tests/compression/Cargo.toml b/tests/compression/Cargo.toml index f97fd6fa0..c03a896fe 100644 --- a/tests/compression/Cargo.toml +++ b/tests/compression/Cargo.toml @@ -16,9 +16,9 @@ pin-project = "1.0" prost = "0.10" tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]} tokio-stream = {version = "0.1.5", features = ["net"]} -tonic = {path = "../../tonic", features = ["compression"]} +tonic = {path = "../../tonic", features = ["gzip"]} tower = {version = "0.4", features = []} tower-http = {version = "0.3", features = ["map-response-body", "map-request-body"]} [build-dependencies] -tonic-build = {path = "../../tonic-build", features = ["compression"]} +tonic-build = {path = "../../tonic-build" } diff --git a/tests/compression/src/bidirectional_stream.rs b/tests/compression/src/bidirectional_stream.rs index 55461cde5..a92783c34 100644 --- a/tests/compression/src/bidirectional_stream.rs +++ b/tests/compression/src/bidirectional_stream.rs @@ -1,12 +1,13 @@ use super::*; +use tonic::codec::CompressionEncoding; #[tokio::test(flavor = "multi_thread")] async fn client_enabled_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); let svc = test_server::TestServer::new(Svc::default()) - .accept_gzip() - .send_gzip(); + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); let request_bytes_counter = Arc::new(AtomicUsize::new(0)); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -43,8 +44,8 @@ async fn client_enabled_server_enabled() { }); let mut client = test_client::TestClient::new(mock_io_channel(client).await) - .send_gzip() - .accept_gzip(); + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); diff --git a/tests/compression/src/client_stream.rs b/tests/compression/src/client_stream.rs index e8ad6dab7..a749c2b58 100644 --- a/tests/compression/src/client_stream.rs +++ b/tests/compression/src/client_stream.rs @@ -1,11 +1,13 @@ use super::*; use http_body::Body as _; +use tonic::codec::CompressionEncoding; #[tokio::test(flavor = "multi_thread")] async fn client_enabled_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip); let request_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -33,7 +35,8 @@ async fn client_enabled_server_enabled() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .send_compressed(CompressionEncoding::Gzip); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); @@ -49,7 +52,8 @@ async fn client_enabled_server_enabled() { async fn client_disabled_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip); let request_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -103,7 +107,8 @@ async fn client_enabled_server_disabled() { .unwrap(); }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .send_compressed(CompressionEncoding::Gzip); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); @@ -122,7 +127,8 @@ async fn client_enabled_server_disabled() { async fn compressing_response_from_client_stream() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).send_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -147,7 +153,8 @@ async fn compressing_response_from_client_stream() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let stream = futures::stream::iter(vec![]); let req = Request::new(Box::pin(stream)); diff --git a/tests/compression/src/compressing_request.rs b/tests/compression/src/compressing_request.rs index a5352945a..dd0536091 100644 --- a/tests/compression/src/compressing_request.rs +++ b/tests/compression/src/compressing_request.rs @@ -1,11 +1,13 @@ use super::*; use http_body::Body as _; +use tonic::codec::CompressionEncoding; #[tokio::test(flavor = "multi_thread")] async fn client_enabled_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip); let request_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -35,7 +37,8 @@ async fn client_enabled_server_enabled() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .send_compressed(CompressionEncoding::Gzip); for _ in 0..3 { client @@ -63,7 +66,8 @@ async fn client_enabled_server_disabled() { .unwrap(); }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .send_compressed(CompressionEncoding::Gzip); let status = client .compress_input_unary(SomeData { @@ -88,7 +92,8 @@ async fn client_enabled_server_disabled() { async fn client_mark_compressed_without_header_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip); tokio::spawn({ async move { @@ -107,7 +112,7 @@ async fn client_mark_compressed_without_header_server_enabled() { Ok(req) }, ) - .send_gzip(); + .send_compressed(CompressionEncoding::Gzip); let status = client .compress_input_unary(SomeData { diff --git a/tests/compression/src/compressing_response.rs b/tests/compression/src/compressing_response.rs index 28674a5d5..5c1cb9fa9 100644 --- a/tests/compression/src/compressing_response.rs +++ b/tests/compression/src/compressing_response.rs @@ -1,4 +1,5 @@ use super::*; +use tonic::codec::CompressionEncoding; #[tokio::test(flavor = "multi_thread")] async fn client_enabled_server_enabled() { @@ -31,7 +32,8 @@ async fn client_enabled_server_enabled() { } } - let svc = test_server::TestServer::new(Svc::default()).send_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -57,7 +59,8 @@ async fn client_enabled_server_enabled() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); for _ in 0..3 { let res = client.compress_output_unary(()).await.unwrap(); @@ -97,7 +100,8 @@ async fn client_enabled_server_disabled() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let res = client.compress_output_unary(()).await.unwrap(); @@ -135,7 +139,8 @@ async fn client_disabled() { } } - let svc = test_server::TestServer::new(Svc::default()).send_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -175,7 +180,8 @@ async fn client_disabled() { async fn server_replying_with_unsupported_encoding() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).send_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); fn add_weird_content_encoding(mut response: http::Response) -> http::Response { response @@ -197,7 +203,8 @@ async fn server_replying_with_unsupported_encoding() { .unwrap(); }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let status: Status = client.compress_output_unary(()).await.unwrap_err(); assert_eq!(status.code(), tonic::Code::Unimplemented); @@ -214,7 +221,7 @@ async fn disabling_compression_on_single_response() { let svc = test_server::TestServer::new(Svc { disable_compressing_on_response: true, }) - .send_gzip(); + .send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -239,7 +246,8 @@ async fn disabling_compression_on_single_response() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let res = client.compress_output_unary(()).await.unwrap(); assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); @@ -254,7 +262,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() { let svc = test_server::TestServer::new(Svc { disable_compressing_on_response: true, }) - .send_gzip(); + .send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -279,7 +287,8 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let res = client.compress_output_server_stream(()).await.unwrap(); @@ -309,7 +318,7 @@ async fn disabling_compression_on_response_from_client_stream() { let svc = test_server::TestServer::new(Svc { disable_compressing_on_response: true, }) - .send_gzip(); + .send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -334,7 +343,8 @@ async fn disabling_compression_on_response_from_client_stream() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let stream = futures::stream::iter(vec![]); let req = Request::new(Box::pin(stream)); diff --git a/tests/compression/src/server_stream.rs b/tests/compression/src/server_stream.rs index 3d82cff55..2ec52bb08 100644 --- a/tests/compression/src/server_stream.rs +++ b/tests/compression/src/server_stream.rs @@ -1,11 +1,13 @@ use super::*; +use tonic::codec::CompressionEncoding; use tonic::Streaming; #[tokio::test(flavor = "multi_thread")] async fn client_enabled_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).send_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -30,7 +32,8 @@ async fn client_enabled_server_enabled() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let res = client.compress_output_server_stream(()).await.unwrap(); @@ -57,7 +60,8 @@ async fn client_enabled_server_enabled() { async fn client_disabled_server_enabled() { let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); - let svc = test_server::TestServer::new(Svc::default()).send_gzip(); + let svc = + test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); let response_bytes_counter = Arc::new(AtomicUsize::new(0)); @@ -127,7 +131,8 @@ async fn client_enabled_server_disabled() { } }); - let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); + let mut client = test_client::TestClient::new(mock_io_channel(client).await) + .accept_compressed(CompressionEncoding::Gzip); let res = client.compress_output_server_stream(()).await.unwrap(); diff --git a/tonic-build/Cargo.toml b/tonic-build/Cargo.toml index 76198abb2..4383b0383 100644 --- a/tonic-build/Cargo.toml +++ b/tonic-build/Cargo.toml @@ -22,7 +22,6 @@ quote = "1.0" syn = "1.0" [features] -compression = [] default = ["transport", "prost"] prost = ["prost-build"] transport = [] diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index 4c3198484..fa89724eb 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -79,20 +79,20 @@ pub fn generate( #service_ident::new(InterceptedService::new(inner, interceptor)) } - /// Compress requests with `gzip`. + /// Compress requests with the given encoding. /// /// This requires the server to support it otherwise it might respond with an /// error. #[must_use] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); self } - /// Enable decompressing responses with `gzip`. + /// Enable decompressing responses. #[must_use] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index affd2db38..135b0f174 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -39,32 +39,20 @@ pub fn generate( let mod_attributes = attributes.for_mod(package); let struct_attributes = attributes.for_struct(&path); - let compression_enabled = cfg!(feature = "compression"); - - let compression_config_ty = if compression_enabled { - quote! { EnabledCompressionEncodings } - } else { - quote! { () } - }; - - let configure_compression_methods = if compression_enabled { - quote! { - /// Enable decompressing requests with `gzip`. - #[must_use] - pub fn accept_gzip(mut self) -> Self { - self.accept_compression_encodings.enable_gzip(); - self - } + let configure_compression_methods = quote! { + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } - /// Compress responses with `gzip`, if the client supports it. - #[must_use] - pub fn send_gzip(mut self) -> Self { - self.send_compression_encodings.enable_gzip(); - self - } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self } - } else { - quote! {} }; quote! { @@ -87,8 +75,8 @@ pub fn generate( #[derive(Debug)] pub struct #server_service { inner: _Inner, - accept_compression_encodings: #compression_config_ty, - send_compression_encodings: #compression_config_ty, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, } struct _Inner(Arc); diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index f101c7105..c274ca93d 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -24,7 +24,7 @@ version = "0.7.2" [features] codegen = ["async-trait"] -compression = ["flate2"] +gzip = ["flate2"] default = ["transport", "codegen", "prost"] prost = ["prost1", "prost-derive"] tls = ["rustls-pemfile", "transport", "tokio-rustls"] diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 19c994011..9ad0bf1da 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "compression")] use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings}; use crate::{ body::BoxBody, @@ -31,10 +30,8 @@ use std::fmt; /// [gRPC protocol definition]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests pub struct Grpc { inner: T, - #[cfg(feature = "compression")] /// Which compression encodings does the client accept? accept_compression_encodings: EnabledCompressionEncodings, - #[cfg(feature = "compression")] /// The compression encoding that will be applied to requests. send_compression_encodings: Option, } @@ -44,16 +41,14 @@ impl Grpc { pub fn new(inner: T) -> Self { Self { inner, - #[cfg(feature = "compression")] send_compression_encodings: None, - #[cfg(feature = "compression")] accept_compression_encodings: EnabledCompressionEncodings::default(), } } - /// Compress requests with `gzip`. + /// Compress requests with the provided encoding. /// - /// Requires the server to accept `gzip` otherwise it might return an error. + /// Requires the server to accept the specified encoding, otherwise it might return an error. /// /// # Example /// @@ -61,10 +56,11 @@ impl Grpc { /// /// ```rust /// use tonic::transport::Channel; + /// # enum CompressionEncoding { Gzip } /// # struct TestClient(T); /// # impl TestClient { /// # fn new(channel: T) -> Self { Self(channel) } - /// # fn send_gzip(self) -> Self { self } + /// # fn send_compressed(self, _: CompressionEncoding) -> Self { self } /// # } /// /// # async { @@ -73,25 +69,15 @@ impl Grpc { /// .await /// .unwrap(); /// - /// let client = TestClient::new(channel).send_gzip(); + /// let client = TestClient::new(channel).send_compressed(CompressionEncoding::Gzip); /// # }; /// ``` - #[cfg(feature = "compression")] - #[cfg_attr(docsrs, doc(cfg(feature = "compression")))] - pub fn send_gzip(mut self) -> Self { - self.send_compression_encodings = Some(CompressionEncoding::Gzip); + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings = Some(encoding); self } - #[doc(hidden)] - #[cfg(not(feature = "compression"))] - pub fn send_gzip(self) -> Self { - panic!( - "`send_gzip` called on a client but the `compression` feature is not enabled on tonic" - ); - } - - /// Enable accepting `gzip` compressed responses. + /// Enable accepting compressed responses. /// /// Requires the server to also support sending compressed responses. /// @@ -101,10 +87,11 @@ impl Grpc { /// /// ```rust /// use tonic::transport::Channel; + /// # enum CompressionEncoding { Gzip } /// # struct TestClient(T); /// # impl TestClient { /// # fn new(channel: T) -> Self { Self(channel) } - /// # fn accept_gzip(self) -> Self { self } + /// # fn accept_compressed(self, _: CompressionEncoding) -> Self { self } /// # } /// /// # async { @@ -113,22 +100,14 @@ impl Grpc { /// .await /// .unwrap(); /// - /// let client = TestClient::new(channel).accept_gzip(); + /// let client = TestClient::new(channel).accept_compressed(CompressionEncoding::Gzip); /// # }; /// ``` - #[cfg(feature = "compression")] - #[cfg_attr(docsrs, doc(cfg(feature = "compression")))] - pub fn accept_gzip(mut self) -> Self { - self.accept_compression_encodings.enable_gzip(); + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); self } - #[doc(hidden)] - #[cfg(not(feature = "compression"))] - pub fn accept_gzip(self) -> Self { - panic!("`accept_gzip` called on a client but the `compression` feature is not enabled on tonic"); - } - /// Check if the inner [`GrpcService`] is able to accept a new request. /// /// This will call [`GrpcService::poll_ready`] until it returns ready or @@ -238,14 +217,7 @@ impl Grpc { let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri"); let request = request - .map(|s| { - encode_client( - codec.encoder(), - s, - #[cfg(feature = "compression")] - self.send_compression_encodings, - ) - }) + .map(|s| encode_client(codec.encoder(), s, self.send_compression_encodings)) .map(BoxBody::new); let mut request = request.into_http( @@ -265,24 +237,21 @@ impl Grpc { .headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc")); - #[cfg(feature = "compression")] - { - if let Some(encoding) = self.send_compression_encodings { - request.headers_mut().insert( - crate::codec::compression::ENCODING_HEADER, - encoding.into_header_value(), - ); - } + if let Some(encoding) = self.send_compression_encodings { + request.headers_mut().insert( + crate::codec::compression::ENCODING_HEADER, + encoding.into_header_value(), + ); + } - if let Some(header_value) = self - .accept_compression_encodings - .into_accept_encoding_header_value() - { - request.headers_mut().insert( - crate::codec::compression::ACCEPT_ENCODING_HEADER, - header_value, - ); - } + if let Some(header_value) = self + .accept_compression_encodings + .into_accept_encoding_header_value() + { + request.headers_mut().insert( + crate::codec::compression::ACCEPT_ENCODING_HEADER, + header_value, + ); } let response = self @@ -291,7 +260,6 @@ impl Grpc { .await .map_err(|err| Status::from_error(err.into()))?; - #[cfg(feature = "compression")] let encoding = CompressionEncoding::from_encoding_header( response.headers(), self.accept_compression_encodings, @@ -314,13 +282,7 @@ impl Grpc { let response = response.map(|body| { if expect_additional_trailers { - Streaming::new_response( - codec.decoder(), - body, - status_code, - #[cfg(feature = "compression")] - encoding, - ) + Streaming::new_response(codec.decoder(), body, status_code, encoding) } else { Streaming::new_empty(codec.decoder(), body) } @@ -334,9 +296,7 @@ impl Clone for Grpc { fn clone(&self) -> Self { Self { inner: self.inner.clone(), - #[cfg(feature = "compression")] send_compression_encodings: self.send_compression_encodings, - #[cfg(feature = "compression")] accept_compression_encodings: self.accept_compression_encodings, } } @@ -348,10 +308,8 @@ impl fmt::Debug for Grpc { f.field("inner", &self.inner); - #[cfg(feature = "compression")] f.field("compression_encoding", &self.send_compression_encodings); - #[cfg(feature = "compression")] f.field( "accept_compression_encodings", &self.accept_compression_encodings, diff --git a/tonic/src/codec/compression.rs b/tonic/src/codec/compression.rs index fa57a826c..7063bd865 100644 --- a/tonic/src/codec/compression.rs +++ b/tonic/src/codec/compression.rs @@ -1,6 +1,7 @@ use super::encode::BUFFER_SIZE; use crate::{metadata::MetadataValue, Status}; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BytesMut}; +#[cfg(feature = "gzip")] use flate2::read::{GzDecoder, GzEncoder}; use std::fmt; @@ -10,28 +11,44 @@ pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; /// Struct used to configure which encodings are enabled on a server or channel. #[derive(Debug, Default, Clone, Copy)] pub struct EnabledCompressionEncodings { + #[cfg(feature = "gzip")] pub(crate) gzip: bool, } impl EnabledCompressionEncodings { - /// Check if `gzip` compression is enabled. - pub fn gzip(self) -> bool { - self.gzip + /// Check if a [`CompressionEncoding`] is enabled. + pub fn is_enabled(&self, encoding: CompressionEncoding) -> bool { + match encoding { + #[cfg(feature = "gzip")] + CompressionEncoding::Gzip => self.gzip, + } } - /// Enable `gzip` compression. - pub fn enable_gzip(&mut self) { - self.gzip = true; + /// Enable a [`CompressionEncoding`]. + pub fn enable(&mut self, encoding: CompressionEncoding) { + match encoding { + #[cfg(feature = "gzip")] + CompressionEncoding::Gzip => self.gzip = true, + } } pub(crate) fn into_accept_encoding_header_value(self) -> Option { - let Self { gzip } = self; - if gzip { + if self.is_gzip_enabled() { Some(http::HeaderValue::from_static("gzip,identity")) } else { None } } + + #[cfg(feature = "gzip")] + const fn is_gzip_enabled(&self) -> bool { + self.gzip + } + + #[cfg(not(feature = "gzip"))] + const fn is_gzip_enabled(&self) -> bool { + false + } } /// The compression encodings Tonic supports. @@ -39,6 +56,8 @@ impl EnabledCompressionEncodings { #[non_exhaustive] pub enum CompressionEncoding { #[allow(missing_docs)] + #[cfg(feature = "gzip")] + #[cfg_attr(docsrs, doc(cfg(feature = "gzip")))] Gzip, } @@ -48,13 +67,16 @@ impl CompressionEncoding { map: &http::HeaderMap, enabled_encodings: EnabledCompressionEncodings, ) -> Option { + if !enabled_encodings.is_gzip_enabled() { + return None; + } + let header_value = map.get(ACCEPT_ENCODING_HEADER)?; let header_value_str = header_value.to_str().ok()?; - let EnabledCompressionEncodings { gzip } = enabled_encodings; - split_by_comma(header_value_str).find_map(|value| match value { - "gzip" if gzip => Some(CompressionEncoding::Gzip), + #[cfg(feature = "gzip")] + "gzip" => Some(CompressionEncoding::Gzip), _ => None, }) } @@ -76,10 +98,11 @@ impl CompressionEncoding { return Ok(None); }; - let EnabledCompressionEncodings { gzip } = enabled_encodings; - match header_value_str { - "gzip" if gzip => Ok(Some(CompressionEncoding::Gzip)), + #[cfg(feature = "gzip")] + "gzip" if enabled_encodings.is_enabled(CompressionEncoding::Gzip) => { + Ok(Some(CompressionEncoding::Gzip)) + } "identity" => Ok(None), other => { let mut status = Status::unimplemented(format!( @@ -102,14 +125,24 @@ impl CompressionEncoding { pub(crate) fn into_header_value(self) -> http::HeaderValue { match self { + #[cfg(feature = "gzip")] CompressionEncoding::Gzip => http::HeaderValue::from_static("gzip"), } } + + pub(crate) fn encodings() -> &'static [Self] { + &[ + #[cfg(feature = "gzip")] + CompressionEncoding::Gzip, + ] + } } impl fmt::Display for CompressionEncoding { + #[allow(unused_variables)] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { + match *self { + #[cfg(feature = "gzip")] CompressionEncoding::Gzip => write!(f, "gzip"), } } @@ -120,6 +153,7 @@ fn split_by_comma(s: &str) -> impl Iterator { } /// Compress `len` bytes from `decompressed_buf` into `out_buf`. +#[allow(unused_variables, unreachable_code)] pub(crate) fn compress( encoding: CompressionEncoding, decompressed_buf: &mut BytesMut, @@ -130,13 +164,14 @@ pub(crate) fn compress( out_buf.reserve(capacity); match encoding { + #[cfg(feature = "gzip")] CompressionEncoding::Gzip => { let mut gzip_encoder = GzEncoder::new( &decompressed_buf[0..len], // FIXME: support customizing the compression level flate2::Compression::new(6), ); - let mut out_writer = out_buf.writer(); + let mut out_writer = bytes::BufMut::writer(out_buf); std::io::copy(&mut gzip_encoder, &mut out_writer)?; } @@ -148,6 +183,7 @@ pub(crate) fn compress( } /// Decompress `len` bytes from `compressed_buf` into `out_buf`. +#[allow(unused_variables, unreachable_code)] pub(crate) fn decompress( encoding: CompressionEncoding, compressed_buf: &mut BytesMut, @@ -159,9 +195,10 @@ pub(crate) fn decompress( out_buf.reserve(capacity); match encoding { + #[cfg(feature = "gzip")] CompressionEncoding::Gzip => { let mut gzip_decoder = GzDecoder::new(&compressed_buf[0..len]); - let mut out_writer = out_buf.writer(); + let mut out_writer = bytes::BufMut::writer(out_buf); std::io::copy(&mut gzip_decoder, &mut out_writer)?; } diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 2c0a2f883..03614b1ac 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "compression")] use super::compression::{decompress, CompressionEncoding}; use super::{DecodeBuf, Decoder, HEADER_SIZE}; use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; @@ -27,18 +26,19 @@ pub struct Streaming { direction: Direction, buf: BytesMut, trailers: Option, - #[cfg(feature = "compression")] decompress_buf: BytesMut, - #[cfg(feature = "compression")] encoding: Option, } impl Unpin for Streaming {} -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum State { ReadHeader, - ReadBody { compression: bool, len: usize }, + ReadBody { + compression: Option, + len: usize, + }, Error, } @@ -54,20 +54,14 @@ impl Streaming { decoder: D, body: B, status_code: StatusCode, - #[cfg(feature = "compression")] encoding: Option, + encoding: Option, ) -> Self where B: Body + Send + 'static, B::Error: Into, D: Decoder + Send + 'static, { - Self::new( - decoder, - body, - Direction::Response(status_code), - #[cfg(feature = "compression")] - encoding, - ) + Self::new(decoder, body, Direction::Response(status_code), encoding) } pub(crate) fn new_empty(decoder: D, body: B) -> Self @@ -76,40 +70,24 @@ impl Streaming { B::Error: Into, D: Decoder + Send + 'static, { - Self::new( - decoder, - body, - Direction::EmptyResponse, - #[cfg(feature = "compression")] - None, - ) + Self::new(decoder, body, Direction::EmptyResponse, None) } #[doc(hidden)] - pub fn new_request( - decoder: D, - body: B, - #[cfg(feature = "compression")] encoding: Option, - ) -> Self + pub fn new_request(decoder: D, body: B, encoding: Option) -> Self where B: Body + Send + 'static, B::Error: Into, D: Decoder + Send + 'static, { - Self::new( - decoder, - body, - Direction::Request, - #[cfg(feature = "compression")] - encoding, - ) + Self::new(decoder, body, Direction::Request, encoding) } fn new( decoder: D, body: B, direction: Direction, - #[cfg(feature = "compression")] encoding: Option, + encoding: Option, ) -> Self where B: Body + Send + 'static, @@ -126,9 +104,7 @@ impl Streaming { direction, buf: BytesMut::with_capacity(BUFFER_SIZE), trailers: None, - #[cfg(feature = "compression")] decompress_buf: BytesMut::new(), - #[cfg(feature = "compression")] encoding, } } @@ -217,13 +193,12 @@ impl Streaming { return Ok(None); } - let is_compressed = match self.buf.get_u8() { - 0 => false, + let compression_encoding = match self.buf.get_u8() { + 0 => None, 1 => { - #[cfg(feature = "compression")] { if self.encoding.is_some() { - true + self.encoding } else { // https://grpc.github.io/grpc/core/md_doc_compression.html // An ill-constructed message with its Compressed-Flag bit set but lacking a grpc-encoding @@ -232,13 +207,6 @@ impl Streaming { return Err(Status::new(Code::Internal, "protocol error: received message with compressed-flag but no grpc-encoding was specified")); } } - #[cfg(not(feature = "compression"))] - { - return Err(Status::new( - Code::Unimplemented, - "Message compressed, compression support not enabled.".to_string(), - )); - } } f => { trace!("unexpected compression flag"); @@ -257,54 +225,40 @@ impl Streaming { self.buf.reserve(len); self.state = State::ReadBody { - compression: is_compressed, + compression: compression_encoding, len, } } - if let State::ReadBody { len, compression } = &self.state { + if let State::ReadBody { len, compression } = self.state { // if we haven't read enough of the message then return and keep // reading - if self.buf.remaining() < *len || self.buf.len() < *len { + if self.buf.remaining() < len || self.buf.len() < len { return Ok(None); } - let decoding_result = if *compression { - #[cfg(feature = "compression")] + let decoding_result = if let Some(encoding) = compression { + self.decompress_buf.clear(); + + if let Err(err) = decompress(encoding, &mut self.buf, &mut self.decompress_buf, len) { - self.decompress_buf.clear(); - - if let Err(err) = decompress( - self.encoding.unwrap_or_else(|| { - // SAFETY: The check while in State::ReadHeader would already have returned Code::Internal - unreachable!("message was compressed but `Streaming.encoding` was `None`. This is a bug in Tonic. Please file an issue") - }), - &mut self.buf, - &mut self.decompress_buf, - *len, - ) { - let message = if let Direction::Response(status) = self.direction { - format!( - "Error decompressing: {}, while receiving response with status: {}", - err, status - ) - } else { - format!("Error decompressing: {}, while sending request", err) - }; - return Err(Status::new(Code::Internal, message)); - } - let decompressed_len = self.decompress_buf.len(); - self.decoder.decode(&mut DecodeBuf::new( - &mut self.decompress_buf, - decompressed_len, - )) + let message = if let Direction::Response(status) = self.direction { + format!( + "Error decompressing: {}, while receiving response with status: {}", + err, status + ) + } else { + format!("Error decompressing: {}, while sending request", err) + }; + return Err(Status::new(Code::Internal, message)); } - - #[cfg(not(feature = "compression"))] - unreachable!("should not take this branch if compression is disabled") + let decompressed_len = self.decompress_buf.len(); + self.decoder.decode(&mut DecodeBuf::new( + &mut self.decompress_buf, + decompressed_len, + )) } else { - self.decoder - .decode(&mut DecodeBuf::new(&mut self.buf, *len)) + self.decoder.decode(&mut DecodeBuf::new(&mut self.buf, len)) }; return match decoding_result { diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 3e2c1a327..fbeb1e870 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "compression")] use super::compression::{compress, CompressionEncoding, SingleMessageCompressionOverride}; use super::{EncodeBuf, Encoder, HEADER_SIZE}; use crate::{Code, Status}; @@ -18,22 +17,14 @@ pub(super) const BUFFER_SIZE: usize = 8 * 1024; pub(crate) fn encode_server( encoder: T, source: U, - #[cfg(feature = "compression")] compression_encoding: Option, - #[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride, + compression_encoding: Option, + compression_override: SingleMessageCompressionOverride, ) -> EncodeBody>> where T: Encoder, U: Stream>, { - let stream = encode( - encoder, - source, - #[cfg(feature = "compression")] - compression_encoding, - #[cfg(feature = "compression")] - compression_override, - ) - .into_stream(); + let stream = encode(encoder, source, compression_encoding, compression_override).into_stream(); EncodeBody::new_server(stream) } @@ -41,7 +32,7 @@ where pub(crate) fn encode_client( encoder: T, source: U, - #[cfg(feature = "compression")] compression_encoding: Option, + compression_encoding: Option, ) -> EncodeBody>> where T: Encoder, @@ -50,9 +41,7 @@ where let stream = encode( encoder, source.map(Ok), - #[cfg(feature = "compression")] compression_encoding, - #[cfg(feature = "compression")] SingleMessageCompressionOverride::default(), ) .into_stream(); @@ -62,8 +51,8 @@ where fn encode( mut encoder: T, source: U, - #[cfg(feature = "compression")] compression_encoding: Option, - #[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride, + compression_encoding: Option, + compression_override: SingleMessageCompressionOverride, ) -> impl TryStream where T: Encoder, @@ -72,17 +61,17 @@ where async_stream::stream! { let mut buf = BytesMut::with_capacity(BUFFER_SIZE); - #[cfg(feature = "compression")] - let (compression_enabled_for_stream, mut uncompression_buf) = match compression_encoding { - Some(CompressionEncoding::Gzip) => (true, BytesMut::with_capacity(BUFFER_SIZE)), - None => (false, BytesMut::new()), + let compression_encoding = if compression_override == SingleMessageCompressionOverride::Disable { + None + } else { + compression_encoding }; - #[cfg(feature = "compression")] - let compress_item = compression_enabled_for_stream && compression_override == SingleMessageCompressionOverride::Inherit; - - #[cfg(not(feature = "compression"))] - let compress_item = false; + let mut uncompression_buf = if compression_encoding.is_some() { + BytesMut::with_capacity(BUFFER_SIZE) + } else { + BytesMut::new() + }; futures_util::pin_mut!(source); @@ -94,26 +83,20 @@ where buf.advance_mut(HEADER_SIZE); } - if compress_item { - #[cfg(feature = "compression")] - { - uncompression_buf.clear(); + if let Some(encoding) = compression_encoding { + uncompression_buf.clear(); - encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf)) - .map_err(|err| Status::internal(format!("Error encoding: {}", err)))?; - - let uncompressed_len = uncompression_buf.len(); + encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf)) + .map_err(|err| Status::internal(format!("Error encoding: {}", err)))?; - compress( - compression_encoding.unwrap(), - &mut uncompression_buf, - &mut buf, - uncompressed_len, - ).map_err(|err| Status::internal(format!("Error compressing: {}", err)))?; - } + let uncompressed_len = uncompression_buf.len(); - #[cfg(not(feature = "compression"))] - unreachable!("compression disabled, should not take this branch"); + compress( + encoding, + &mut uncompression_buf, + &mut buf, + uncompressed_len, + ).map_err(|err| Status::internal(format!("Error compressing: {}", err)))?; } else { encoder.encode(item, &mut EncodeBuf::new(&mut buf)) .map_err(|err| Status::internal(format!("Error encoding: {}", err)))?; @@ -124,7 +107,7 @@ where assert!(len <= std::u32::MAX as usize); { let mut buf = &mut buf[..HEADER_SIZE]; - buf.put_u8(compress_item as u8); + buf.put_u8(compression_encoding.is_some() as u8); buf.put_u32(len as u32); } diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index f10d9fe63..cc330b14c 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -4,7 +4,6 @@ //! and a protobuf codec based on prost. mod buffer; -#[cfg(feature = "compression")] pub(crate) mod compression; mod decode; mod encode; @@ -17,8 +16,6 @@ use std::io; pub(crate) use self::encode::{encode_client, encode_server}; pub use self::buffer::{DecodeBuf, EncodeBuf}; -#[cfg(feature = "compression")] -#[cfg_attr(docsrs, doc(cfg(feature = "compression")))] pub use self::compression::{CompressionEncoding, EnabledCompressionEncodings}; pub use self::decode::Streaming; #[cfg(feature = "prost")] diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index fae80235f..b8dd39c20 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -10,7 +10,6 @@ pub use std::sync::Arc; pub use std::task::{Context, Poll}; pub use tower_service::Service; pub type StdError = Box; -#[cfg(feature = "compression")] pub use crate::codec::{CompressionEncoding, EnabledCompressionEncodings}; pub use crate::service::interceptor::InterceptedService; pub use bytes::Bytes; diff --git a/tonic/src/response.rs b/tonic/src/response.rs index 89fc98706..68d035d48 100644 --- a/tonic/src/response.rs +++ b/tonic/src/response.rs @@ -116,8 +116,8 @@ impl Response { /// **Note**: This only has effect on responses to unary requests and responses to client to /// server streams. Response streams (server to client stream and bidirectional streams) will /// still be compressed according to the configuration of the server. - #[cfg(feature = "compression")] - #[cfg_attr(docsrs, doc(cfg(feature = "compression")))] + #[cfg(feature = "gzip")] + #[cfg_attr(docsrs, doc(cfg(feature = "gzip")))] pub fn disable_compression(&mut self) { self.extensions_mut() .insert(crate::codec::compression::SingleMessageCompressionOverride::Disable); diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index c9979a802..cbe8450ff 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "compression")] use crate::codec::compression::{ CompressionEncoding, EnabledCompressionEncodings, SingleMessageCompressionOverride, }; @@ -34,10 +33,8 @@ macro_rules! t { pub struct Grpc { codec: T, /// Which compression encodings does the server accept for requests? - #[cfg(feature = "compression")] accept_compression_encodings: EnabledCompressionEncodings, /// Which compression encodings might the server use for responses. - #[cfg(feature = "compression")] send_compression_encodings: EnabledCompressionEncodings, } @@ -49,14 +46,12 @@ where pub fn new(codec: T) -> Self { Self { codec, - #[cfg(feature = "compression")] accept_compression_encodings: EnabledCompressionEncodings::default(), - #[cfg(feature = "compression")] send_compression_encodings: EnabledCompressionEncodings::default(), } } - /// Enable accepting `gzip` compressed requests. + /// Enable accepting compressed requests. /// /// If a request with an unsupported encoding is received the server will respond with /// [`Code::UnUnimplemented`](crate::Code). @@ -66,11 +61,12 @@ where /// The most common way of using this is through a server generated by tonic-build: /// /// ```rust + /// # enum CompressionEncoding { Gzip } /// # struct Svc; /// # struct ExampleServer(T); /// # impl ExampleServer { /// # fn new(svc: T) -> Self { Self(svc) } - /// # fn accept_gzip(self) -> Self { self } + /// # fn accept_compressed(self, _: CompressionEncoding) -> Self { self } /// # } /// # #[tonic::async_trait] /// # trait Example {} @@ -80,22 +76,14 @@ where /// // ... /// } /// - /// let service = ExampleServer::new(Svc).accept_gzip(); + /// let service = ExampleServer::new(Svc).accept_compressed(CompressionEncoding::Gzip); /// ``` - #[cfg(feature = "compression")] - #[cfg_attr(docsrs, doc(cfg(feature = "compression")))] - pub fn accept_gzip(mut self) -> Self { - self.accept_compression_encodings.enable_gzip(); + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); self } - #[doc(hidden)] - #[cfg(not(feature = "compression"))] - pub fn accept_gzip(self) -> Self { - panic!("`accept_gzip` called on a server but the `compression` feature is not enabled on tonic"); - } - - /// Enable sending `gzip` compressed responses. + /// Enable sending compressed responses. /// /// Requires the client to also support receiving compressed responses. /// @@ -104,11 +92,12 @@ where /// The most common way of using this is through a server generated by tonic-build: /// /// ```rust + /// # enum CompressionEncoding { Gzip } /// # struct Svc; /// # struct ExampleServer(T); /// # impl ExampleServer { /// # fn new(svc: T) -> Self { Self(svc) } - /// # fn send_gzip(self) -> Self { self } + /// # fn send_compressed(self, _: CompressionEncoding) -> Self { self } /// # } /// # #[tonic::async_trait] /// # trait Example {} @@ -118,24 +107,13 @@ where /// // ... /// } /// - /// let service = ExampleServer::new(Svc).send_gzip(); + /// let service = ExampleServer::new(Svc).send_compressed(CompressionEncoding::Gzip); /// ``` - #[cfg(feature = "compression")] - #[cfg_attr(docsrs, doc(cfg(feature = "compression")))] - pub fn send_gzip(mut self) -> Self { - self.send_compression_encodings.enable_gzip(); + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); self } - #[doc(hidden)] - #[cfg(not(feature = "compression"))] - pub fn send_gzip(self) -> Self { - panic!( - "`send_gzip` called on a server but the `compression` feature is not enabled on tonic" - ); - } - - #[cfg(feature = "compression")] #[doc(hidden)] pub fn apply_compression_config( self, @@ -144,26 +122,18 @@ where ) -> Self { let mut this = self; - let EnabledCompressionEncodings { gzip: accept_gzip } = accept_encodings; - if accept_gzip { - this = this.accept_gzip(); - } - - let EnabledCompressionEncodings { gzip: send_gzip } = send_encodings; - if send_gzip { - this = this.send_gzip(); + for &encoding in CompressionEncoding::encodings() { + if accept_encodings.is_enabled(encoding) { + this = this.accept_compressed(encoding); + } + if send_encodings.is_enabled(encoding) { + this = this.send_compressed(encoding); + } } this } - #[cfg(not(feature = "compression"))] - #[doc(hidden)] - #[allow(unused_variables)] - pub fn apply_compression_config(self, accept_encodings: (), send_encodings: ()) -> Self { - self - } - /// Handle a single unary gRPC request. pub async fn unary( &mut self, @@ -175,7 +145,6 @@ where B: Body + Send + 'static, B::Error: Into + Send, { - #[cfg(feature = "compression")] let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, @@ -187,9 +156,7 @@ where return self .map_response::>>>( Err(status), - #[cfg(feature = "compression")] accept_encoding, - #[cfg(feature = "compression")] SingleMessageCompressionOverride::default(), ); } @@ -200,16 +167,9 @@ where .await .map(|r| r.map(|m| stream::once(future::ok(m)))); - #[cfg(feature = "compression")] let compression_override = compression_override_from_response(&response); - self.map_response( - response, - #[cfg(feature = "compression")] - accept_encoding, - #[cfg(feature = "compression")] - compression_override, - ) + self.map_response(response, accept_encoding, compression_override) } /// Handle a server side streaming request. @@ -224,7 +184,6 @@ where B: Body + Send + 'static, B::Error: Into + Send, { - #[cfg(feature = "compression")] let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, @@ -235,9 +194,7 @@ where Err(status) => { return self.map_response::( Err(status), - #[cfg(feature = "compression")] accept_encoding, - #[cfg(feature = "compression")] SingleMessageCompressionOverride::default(), ); } @@ -247,11 +204,9 @@ where self.map_response( response, - #[cfg(feature = "compression")] accept_encoding, // disabling compression of individual stream items must be done on // the items themselves - #[cfg(feature = "compression")] SingleMessageCompressionOverride::default(), ) } @@ -267,7 +222,6 @@ where B: Body + Send + 'static, B::Error: Into + Send + 'static, { - #[cfg(feature = "compression")] let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, @@ -280,16 +234,9 @@ where .await .map(|r| r.map(|m| stream::once(future::ok(m)))); - #[cfg(feature = "compression")] let compression_override = compression_override_from_response(&response); - self.map_response( - response, - #[cfg(feature = "compression")] - accept_encoding, - #[cfg(feature = "compression")] - compression_override, - ) + self.map_response(response, accept_encoding, compression_override) } /// Handle a bi-directional streaming gRPC request. @@ -304,7 +251,6 @@ where B: Body + Send + 'static, B::Error: Into + Send, { - #[cfg(feature = "compression")] let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, @@ -316,9 +262,7 @@ where self.map_response( response, - #[cfg(feature = "compression")] accept_encoding, - #[cfg(feature = "compression")] SingleMessageCompressionOverride::default(), ) } @@ -331,18 +275,13 @@ where B: Body + Send + 'static, B::Error: Into + Send, { - #[cfg(feature = "compression")] let request_compression_encoding = self.request_encoding_if_supported(&request)?; let (parts, body) = request.into_parts(); - #[cfg(feature = "compression")] let stream = Streaming::new_request(self.codec.decoder(), body, request_compression_encoding); - #[cfg(not(feature = "compression"))] - let stream = Streaming::new_request(self.codec.decoder(), body); - futures_util::pin_mut!(stream); let message = stream @@ -367,24 +306,19 @@ where B: Body + Send + 'static, B::Error: Into + Send, { - #[cfg(feature = "compression")] let encoding = self.request_encoding_if_supported(&request)?; - #[cfg(feature = "compression")] let request = request.map(|body| Streaming::new_request(self.codec.decoder(), body, encoding)); - #[cfg(not(feature = "compression"))] - let request = request.map(|body| Streaming::new_request(self.codec.decoder(), body)); - Ok(Request::from_http(request)) } fn map_response( &mut self, response: Result, Status>, - #[cfg(feature = "compression")] accept_encoding: Option, - #[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride, + accept_encoding: Option, + compression_override: SingleMessageCompressionOverride, ) -> http::Response where B: TryStream + Send + 'static, @@ -402,7 +336,6 @@ where http::header::HeaderValue::from_static("application/grpc"), ); - #[cfg(feature = "compression")] if let Some(encoding) = accept_encoding { // Set the content encoding parts.headers.insert( @@ -414,16 +347,13 @@ where let body = encode_server( self.codec.encoder(), body.into_stream(), - #[cfg(feature = "compression")] accept_encoding, - #[cfg(feature = "compression")] compression_override, ); http::Response::from_parts(parts, BoxBody::new(body)) } - #[cfg(feature = "compression")] fn request_encoding_if_supported( &self, request: &http::Request, @@ -441,13 +371,11 @@ impl fmt::Debug for Grpc { f.field("codec", &self.codec); - #[cfg(feature = "compression")] f.field( "accept_compression_encodings", &self.accept_compression_encodings, ); - #[cfg(feature = "compression")] f.field( "send_compression_encodings", &self.send_compression_encodings, @@ -457,7 +385,6 @@ impl fmt::Debug for Grpc { } } -#[cfg(feature = "compression")] fn compression_override_from_response( res: &Result, E>, ) -> SingleMessageCompressionOverride {