Skip to content

Commit

Permalink
feat(tonic): Add client feature flag to support connect_with_connecto…
Browse files Browse the repository at this point in the history
…r in wasm32 targets

This adds a new feature that enables compilation of connect_with_connector and the Endpoint and Channel struct for wasm.

Related: hyperium#491
  • Loading branch information
lucasmerlin committed Jan 9, 2024
1 parent 177c1f3 commit 837c778
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 120 deletions.
18 changes: 16 additions & 2 deletions tonic/Cargo.toml
Expand Up @@ -38,9 +38,20 @@ transport = [
"channel",
"dep:h2",
"dep:hyper",
"hyper/full",
"dep:tokio",
"tokio/net",
"tokio/time",
"dep:tower",
"tower/balance",
"dep:hyper-timeout",
]
client = [
"dep:h2",
"hyper/client",
"hyper/http2",
"dep:tokio",
"dep:tower",
"dep:hyper-timeout",
]
channel = []
Expand All @@ -55,7 +66,7 @@ bytes = "1.0"
http = "0.2"
tracing = "0.1"

tokio = "1.0.1"
tokio = { version = "1.0.1", default-features = false, optional = true }
http-body = "0.4.4"
percent-encoding = "2.1"
pin-project = "1.0.11"
Expand All @@ -70,7 +81,7 @@ async-trait = {version = "0.1.13", optional = true}

# transport
h2 = {version = "0.3.17", optional = true}
hyper = {version = "0.14.26", features = ["full"], optional = true}
hyper = {version = "0.14.26", default-features = false, optional = true}
hyper-timeout = {version = "0.4", optional = true}
tokio-stream = "0.1"
tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true}
Expand All @@ -88,6 +99,9 @@ webpki-roots = { version = "0.25.0", optional = true }
flate2 = {version = "1.0", optional = true}
zstd = { version = "0.12.3", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4.38"

[dev-dependencies]
bencher = "0.1.5"
quickcheck = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/lib.rs
Expand Up @@ -101,7 +101,7 @@ pub mod metadata;
pub mod server;
pub mod service;

#[cfg(feature = "transport")]
#[cfg(any(feature = "transport", feature = "client"))]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
pub mod transport;

Expand Down
4 changes: 3 additions & 1 deletion tonic/src/transport/channel/endpoint.rs
Expand Up @@ -312,6 +312,7 @@ impl Endpoint {
}

/// Create a channel from this config.
#[cfg(feature = "transport")]
pub async fn connect(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
http.enforce_http(false);
Expand All @@ -333,6 +334,7 @@ impl Endpoint {
///
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
#[cfg(feature = "transport")]
pub fn connect_lazy(&self) -> Channel {
let mut http = hyper::client::connect::HttpConnector::new();
http.enforce_http(false);
Expand Down Expand Up @@ -428,7 +430,7 @@ impl From<Uri> for Endpoint {
http2_keep_alive_while_idle: None,
connect_timeout: None,
http2_adaptive_window: None,
executor: SharedExec::tokio(),
executor: SharedExec::default_exec(),
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions tonic/src/transport/channel/mod.rs
Expand Up @@ -9,7 +9,9 @@ pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;

use super::service::{Connection, DynamicServiceStream, SharedExec};
use super::service::{Connection};
#[cfg(feature = "transport")]
use super::service::{DynamicServiceStream, SharedExec};
use crate::body::BoxBody;
use crate::transport::Executor;
use bytes::Bytes;
Expand Down Expand Up @@ -109,6 +111,7 @@ impl Channel {
///
/// This creates a [`Channel`] that will load balance across all the
/// provided endpoints.
#[cfg(feature = "transport")]
pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
list.for_each(|endpoint| {
Expand All @@ -122,18 +125,20 @@ impl Channel {
/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
#[cfg(feature = "transport")]
pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
where
K: Hash + Eq + Send + Clone + 'static,
{
Self::balance_channel_with_executor(capacity, SharedExec::tokio())
Self::balance_channel_with_executor(capacity, SharedExec::default_exec())
}

/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
///
/// The [`Channel`] will use the given executor to spawn async tasks.
#[cfg(feature = "transport")]
pub fn balance_channel_with_executor<K, E>(
capacity: usize,
executor: E,
Expand Down
3 changes: 3 additions & 0 deletions tonic/src/transport/mod.rs
Expand Up @@ -88,6 +88,7 @@
//! [rustls]: https://docs.rs/rustls/0.16.0/rustls/

pub mod channel;
#[cfg(feature = "transport")]
pub mod server;

mod error;
Expand All @@ -100,13 +101,15 @@ mod tls;
#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
pub use self::channel::{Channel, Endpoint};
pub use self::error::Error;
#[cfg(feature = "transport")]
#[doc(inline)]
pub use self::server::Server;
#[doc(inline)]
pub use self::service::grpc_timeout::TimeoutExpired;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Certificate;
#[cfg(feature = "transport")]
pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter};
pub use hyper::{Body, Uri};

Expand Down
67 changes: 49 additions & 18 deletions tonic/src/transport/service/connection.rs
@@ -1,26 +1,29 @@
use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent};
use crate::{
body::BoxBody,
transport::{BoxFuture, Endpoint},
use std::{
fmt,
task::{Context, Poll},
};

use http::Uri;
use hyper::client::conn::Builder;
use hyper::client::connect::Connection as HyperConnection;
use hyper::client::service::Connect as HyperConnect;
use std::{
fmt,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tower::load::Load;
use tower::{
layer::Layer,
limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
util::BoxService,
ServiceBuilder, ServiceExt,
ServiceBuilder,
ServiceExt, util::BoxService,
};
use tower::load::Load;
use tower_service::Service;

use crate::{
body::BoxBody,
transport::{BoxFuture, Endpoint},
};

use super::{AddOrigin, grpc_timeout::GrpcTimeout, reconnect::Reconnect, UserAgent};

pub(crate) type Request = http::Request<BoxBody>;
pub(crate) type Response = http::Response<hyper::Body>;

Expand All @@ -40,20 +43,32 @@ impl Connection {
.http2_initial_stream_window_size(endpoint.init_stream_window_size)
.http2_initial_connection_window_size(endpoint.init_connection_window_size)
.http2_only(true)
.http2_keep_alive_interval(endpoint.http2_keep_alive_interval)
.executor(endpoint.executor.clone())
.clone();

if let Some(val) = endpoint.http2_keep_alive_timeout {
settings.http2_keep_alive_timeout(val);
if let Some(val) = endpoint.http2_adaptive_window {
settings.http2_adaptive_window(val);
}

if let Some(val) = endpoint.http2_keep_alive_while_idle {
settings.http2_keep_alive_while_idle(val);
#[cfg(feature = "transport")]
{
settings
.http2_keep_alive_interval(endpoint.http2_keep_alive_interval);

if let Some(val) = endpoint.http2_keep_alive_timeout {
settings.http2_keep_alive_timeout(val);
}

if let Some(val) = endpoint.http2_keep_alive_while_idle {
settings.http2_keep_alive_while_idle(val);
}
}

if let Some(val) = endpoint.http2_adaptive_window {
settings.http2_adaptive_window(val);
#[cfg(target_arch = "wasm32")]
{
settings.executor(wasm::Executor)
// reset streams require `Instant::now` which is not available on wasm
.http2_max_concurrent_reset_streams(0);
}

let stack = ServiceBuilder::new()
Expand Down Expand Up @@ -126,3 +141,19 @@ impl fmt::Debug for Connection {
f.debug_struct("Connection").finish()
}
}

#[cfg(target_arch = "wasm32")]
mod wasm {
use std::future::Future;
use std::pin::Pin;

type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

pub(crate) struct Executor;

impl hyper::rt::Executor<BoxSendFuture> for Executor {
fn execute(&self, fut: BoxSendFuture) {
wasm_bindgen_futures::spawn_local(fut)
}
}
}
24 changes: 22 additions & 2 deletions tonic/src/transport/service/executor.rs
Expand Up @@ -3,9 +3,11 @@ use std::{future::Future, sync::Arc};

pub(crate) use hyper::rt::Executor;

#[cfg(not(target_arch = "wasm32"))]
#[derive(Copy, Clone)]
struct TokioExec;

#[cfg(not(target_arch = "wasm32"))]
impl<F> Executor<F> for TokioExec
where
F: Future + Send + 'static,
Expand All @@ -16,6 +18,21 @@ where
}
}

#[cfg(target_arch = "wasm32")]
#[derive(Copy, Clone)]
struct WasmBindgenExec;

#[cfg(target_arch = "wasm32")]
impl<F> Executor<F> for WasmBindgenExec
where
F: Future + 'static,
F::Output: 'static,
{
fn execute(&self, fut: F) {
wasm_bindgen_futures::spawn_local(async move {fut.await;});
}
}

#[derive(Clone)]
pub(crate) struct SharedExec {
inner: Arc<dyn Executor<BoxFuture<'static, ()>> + Send + Sync + 'static>,
Expand All @@ -31,8 +48,11 @@ impl SharedExec {
}
}

pub(crate) fn tokio() -> Self {
Self::new(TokioExec)
pub(crate) fn default_exec() -> Self {
#[cfg(not(target_arch = "wasm32"))]
return Self::new(TokioExec);
#[cfg(target_arch = "wasm32")]
Self::new(WasmBindgenExec)
}
}

Expand Down

0 comments on commit 837c778

Please sign in to comment.