Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Reduce how much code gets monomorphized #1032

Merged
merged 15 commits into from Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 92 additions & 57 deletions tonic/src/client/grpc.rs
Expand Up @@ -2,7 +2,7 @@ use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings
use crate::{
body::BoxBody,
client::GrpcService,
codec::{encode_client, Codec, Streaming},
codec::{encode_client, Codec, Decoder, Streaming},
request::SanitizeHeaders,
Code, Request, Response, Status,
};
Expand Down Expand Up @@ -30,6 +30,10 @@ use std::fmt;
/// [gRPC protocol definition]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
pub struct Grpc<T> {
inner: T,
config: GrpcConfig,
}

struct GrpcConfig {
origin: Uri,
/// Which compression encodings does the client accept?
accept_compression_encodings: EnabledCompressionEncodings,
Expand All @@ -40,12 +44,7 @@ pub struct Grpc<T> {
impl<T> Grpc<T> {
/// Creates a new gRPC client with the provided [`GrpcService`].
pub fn new(inner: T) -> Self {
Self {
inner,
origin: Uri::default(),
send_compression_encodings: None,
accept_compression_encodings: EnabledCompressionEncodings::default(),
}
Self::with_origin(inner, Uri::default())
}

/// Creates a new gRPC client with the provided [`GrpcService`] and `Uri`.
Expand All @@ -55,9 +54,11 @@ impl<T> Grpc<T> {
pub fn with_origin(inner: T, origin: Uri) -> Self {
Self {
inner,
origin,
send_compression_encodings: None,
accept_compression_encodings: EnabledCompressionEncodings::default(),
config: GrpcConfig {
origin,
send_compression_encodings: None,
accept_compression_encodings: EnabledCompressionEncodings::default(),
},
}
}

Expand Down Expand Up @@ -88,7 +89,7 @@ impl<T> Grpc<T> {
/// # };
/// ```
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings = Some(encoding);
self.config.send_compression_encodings = Some(encoding);
self
}

Expand Down Expand Up @@ -119,7 +120,7 @@ impl<T> Grpc<T> {
/// # };
/// ```
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self.config.accept_compression_encodings.enable(encoding);
self
}

Expand Down Expand Up @@ -226,6 +227,73 @@ impl<T> Grpc<T> {
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let request = request
.map(|s| encode_client(codec.encoder(), s, self.config.send_compression_encodings))
.map(BoxBody::new);

let request = self.config.prepare_request(request, path);

let response = self
.inner
.call(request)
.await
.map_err(Status::from_error_generic)?;

let decoder = codec.decoder();

self.create_response(decoder, response)
}

// Keeping this code in a separate function from Self::streaming lets functions that return the
// same output share the generated binary code
fn create_response<M2>(
&self,
decoder: impl Decoder<Item = M2, Error = Status> + Send + 'static,
response: http::Response<T::ResponseBody>,
) -> Result<Response<Streaming<M2>>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + Send + 'static,
<T::ResponseBody as Body>::Error: Into<crate::Error>,
{
let encoding = CompressionEncoding::from_encoding_header(
response.headers(),
self.config.accept_compression_encodings,
)?;

let status_code = response.status();
let trailers_only_status = Status::from_header_map(response.headers());

// We do not need to check for trailers if the `grpc-status` header is present
// with a valid code.
let expect_additional_trailers = if let Some(status) = trailers_only_status {
if status.code() != Code::Ok {
return Err(status);
}

false
} else {
true
};

let response = response.map(|body| {
if expect_additional_trailers {
Streaming::new_response(decoder, body, status_code, encoding)
} else {
Streaming::new_empty(decoder, body)
}
});

Ok(Response::from_http(response))
}
}

impl GrpcConfig {
fn prepare_request(
&self,
request: Request<http_body::combinators::UnsyncBoxBody<bytes::Bytes, Status>>,
path: PathAndQuery,
) -> http::Request<http_body::combinators::UnsyncBoxBody<bytes::Bytes, Status>> {
let scheme = self.origin.scheme().cloned();
let authority = self.origin.authority().cloned();

Expand All @@ -236,10 +304,6 @@ impl<T> Grpc<T> {

let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");

let request = request
.map(|s| encode_client(codec.encoder(), s, self.send_compression_encodings))
.map(BoxBody::new);

let mut request = request.into_http(
uri,
http::Method::POST,
Expand Down Expand Up @@ -274,51 +338,19 @@ impl<T> Grpc<T> {
);
}

let response = self
.inner
.call(request)
.await
.map_err(|err| Status::from_error(err.into()))?;

let encoding = CompressionEncoding::from_encoding_header(
response.headers(),
self.accept_compression_encodings,
)?;

let status_code = response.status();
let trailers_only_status = Status::from_header_map(response.headers());

// We do not need to check for trailers if the `grpc-status` header is present
// with a valid code.
let expect_additional_trailers = if let Some(status) = trailers_only_status {
if status.code() != Code::Ok {
return Err(status);
}

false
} else {
true
};

let response = response.map(|body| {
if expect_additional_trailers {
Streaming::new_response(codec.decoder(), body, status_code, encoding)
} else {
Streaming::new_empty(codec.decoder(), body)
}
});

Ok(Response::from_http(response))
request
}
}

impl<T: Clone> Clone for Grpc<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
origin: self.origin.clone(),
send_compression_encodings: self.send_compression_encodings,
accept_compression_encodings: self.accept_compression_encodings,
config: GrpcConfig {
origin: self.config.origin.clone(),
send_compression_encodings: self.config.send_compression_encodings,
accept_compression_encodings: self.config.accept_compression_encodings,
},
}
}
}
Expand All @@ -329,13 +361,16 @@ impl<T: fmt::Debug> fmt::Debug for Grpc<T> {

f.field("inner", &self.inner);

f.field("origin", &self.origin);
f.field("origin", &self.config.origin);

f.field("compression_encoding", &self.send_compression_encodings);
f.field(
"compression_encoding",
&self.config.send_compression_encodings,
);

f.field(
"accept_compression_encodings",
&self.accept_compression_encodings,
&self.config.accept_compression_encodings,
);

f.finish()
Expand Down