Skip to content

Commit

Permalink
Update the proxy to use Tokio 0.3 (#732)
Browse files Browse the repository at this point in the history
This change updates the proxy to use Tokio 0.3 and the Tokio 0.3
versions of various ecosystem crates. This includes `tower` 0.4 and
`bytes` 0.6, as well as the Tokio 0.3 versions of `tokio-util`, `hyper`,
`tonic`, etc. Due to API changes in Tokio and in other dependencies, it
was necessary to make some code changes as well as updating
dependencies, but there should be no functional change.

In particular:
* Tokio's support for vectored IO changed significantly in 0.3, so this
  branch updates our use of `AsyncWrite` to participate in the new
  vectored write APIs
* Hyper's HTTP/1.1 upgrade API changed in 0.14, so this branch changes the
  proxy's code for handling CONNECT to use the new API
* Tokio removed support for some socket options, which now need to be
  set using `socket2`
* Tokio removed the `poll_ready` method was removed from the bounded
  MPSC channel, so the proxy's buffers (`linkerd2-buffer` and the
  `buffer` module in `linkerd2-proxy-discover`) had to be switched to
  our own implementation (this merged separately, in PR #759).

Several ecosystem crates have yet to be released, so we depend on them
via Git dependencies for now. The patches in Cargo.toml can be
removed as other dependencies publish their Tokio 0.3 versions.
  • Loading branch information
hawkw committed Dec 4, 2020
1 parent 1d64a11 commit c657b3e
Show file tree
Hide file tree
Showing 105 changed files with 989 additions and 1,072 deletions.
737 changes: 358 additions & 379 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion Cargo.toml
Expand Up @@ -63,4 +63,19 @@ debug = false

[patch.crates-io]
webpki = { git = "https://github.com/linkerd/webpki", branch = "cert-dns-names-0.21", rev = "b2c3bb3" }
tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "ad348d8" }
# TODO(eliza): when the Tokio 0.3 versions of our various dependencies are
# published, remove these patches...
tower = { version = "0.4", git = "https://github.com/tower-rs/tower", rev = "450fa3d2be2b43850ceb125009d636d1d8629ad7" }
hyper = { git = "https://github.com/hyperium/hyper", rev = "d6aadb830072959497f414c01bcdba4c8e681088" }
# this isn't pinned to a rev because it needs to be consistent with Hyper's git
# dep (which is not pinned).
h2 = { git = "https://github.com/hyperium/h2" }
# this isn't pinned to a rev because it needs to be consistent with Hyper's git
# dep (which is `branch = "master"`).
http-body = { git = "https://github.com/hyperium/http-body", branch = "master" }
tokio-rustls = { git = "https://github.com/tokio-rs/tls", rev = "e6ef54641b911cfcf23b77a8c4826ae0f8e9870e" }
tonic = { git = "https://github.com/hawkw/tonic", branch = "eliza/tokio-0.3" }
tonic-build = { git = "https://github.com/hawkw/tonic", branch = "eliza/tokio-0.3" }
prost = { git = "https://github.com/danburkert/prost" }
prost-build = { git = "https://github.com/danburkert/prost" }
prost-types = { git = "https://github.com/danburkert/prost" }
6 changes: 3 additions & 3 deletions hyper-balance/Cargo.toml
Expand Up @@ -8,10 +8,10 @@ publish = false
[dependencies]
futures = "0.3"
http = "0.2"
hyper = "0.13.7"
hyper = "0.14.0-dev"
pin-project = "0.4"
tower = { version = "0.3", default-features = false, features = ["load"]}
tokio = { version = "0.2", features = ["macros"]}
tower = { version = "0.4", default-features = false, features = ["load"]}
tokio = { version = "0.3", features = ["macros"]}

[dev-dependencies]
tokio-test = "0.2"
10 changes: 5 additions & 5 deletions linkerd/app/Cargo.toml
Expand Up @@ -24,8 +24,8 @@ linkerd2-app-outbound = { path = "./outbound" }
linkerd2-opencensus = { path = "../opencensus" }
linkerd2-error = { path = "../error" }
regex = "1.0.0"
tokio = { version = "0.2", features = ["rt-util"] }
tonic = { version = "0.3", default-features = false }
tower = "0.3"
tracing = "0.1.19"
tracing-futures = { version = "0.2", features = ["std-future"]}
tokio = { version = "0.3", features = ["rt"] }
tonic = { version = "0.3", default-features = false, features = ["prost"] }
tower = "0.4"
tracing = "0.1.22"
tracing-futures = { version = "0.2", features = ["std-future"]}
18 changes: 9 additions & 9 deletions linkerd/app/core/Cargo.toml
Expand Up @@ -15,10 +15,10 @@ independently of the inbound and outbound proxy logic.
mock-orig-dst = ["linkerd2-proxy-transport/mock-orig-dst"]

[dependencies]
bytes = "0.5"
bytes = "0.6"
http = "0.2"
http-body = "0.3"
hyper = "0.13.7"
http-body = "0.4"
hyper = "0.14.0-dev"
futures = "0.3"
indexmap = "1.0"
ipnet = "1.0"
Expand All @@ -40,7 +40,7 @@ linkerd2-http-metrics = { path = "../../http-metrics" }
linkerd2-metrics = { path = "../../metrics" }
linkerd2-opencensus = { path = "../../opencensus" }
linkerd2-proxy-core = { path = "../../proxy/core" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "259628840ba613c2e5673fc6a39b946e1b06f09a" }
linkerd2-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd2-proxy-discover = { path = "../../proxy/discover" }
linkerd2-proxy-identity = { path = "../../proxy/identity" }
Expand All @@ -60,16 +60,16 @@ linkerd2-stack-metrics = { path = "../../stack/metrics" }
linkerd2-stack-tracing = { path = "../../stack/tracing" }
linkerd2-trace-context = { path = "../../trace-context" }
regex = "1.0.0"
tokio = { version = "0.2.22", features = ["macros", "sync", "parking_lot"]}
tokio = { version = "0.3", features = ["macros", "sync", "parking_lot"]}
tokio-timer = "0.2"
tower-request-modifier = { git = "https://github.com/tower-rs/tower-http" }
tower-request-modifier = { git = "https://github.com/tower-rs/tower-http", rev = "bd7a4654bdc4e2b5363572e9f66b4dbbc7c0e1ea" }
tonic = { version = "0.3", default-features = false, features = ["prost"] }
tracing = "0.1.19"
tracing = "0.1.22"
tracing-futures = { version = "0.2" }
pin-project = "0.4"

[dependencies.tower]
version = "0.3"
version = "0.4"
# disable tower's tracing `log` integration for performance reasons, since we
# will consume tower's traces as traces.
default-features = false
Expand All @@ -85,5 +85,5 @@ libc = "0.2"
procinfo = "0.4.2"

[dev-dependencies]
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15", features = ["arbitrary"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "259628840ba613c2e5673fc6a39b946e1b06f09a", features = ["arbitrary"] }
prost-types = "0.6.0"
2 changes: 1 addition & 1 deletion linkerd/app/core/src/control.rs
Expand Up @@ -34,7 +34,7 @@ impl fmt::Display for ControlAddr {
}

type BalanceBody =
http::balance::PendingUntilFirstDataBody<tower::load::peak_ewma::Handle, http::Payload>;
http::balance::PendingUntilFirstDataBody<tower::load::peak_ewma::Handle, hyper::Body>;

type RspBody = linkerd2_http_metrics::requests::ResponseBody<BalanceBody, classify::Eos>;

Expand Down
9 changes: 5 additions & 4 deletions linkerd/app/gateway/Cargo.toml
Expand Up @@ -12,11 +12,12 @@ indexmap = "1.0"
linkerd2-app-core = { path = "../core" }
linkerd2-app-inbound = { path = "../inbound" }
linkerd2-app-outbound = { path = "../outbound" }
tower = { version = "0.3", default-features = false }
tracing = "0.1.19"
tower = { version = "0.4", default-features = false }
tracing = "0.1.22"

[dev-dependencies]
tokio = { version = "0.2", features = ["rt-core", "macros"] }
tokio = { version = "0.3", features = ["rt", "macros"] }
tokio-test = "0.2"
tower = { version = "0.3", default-features = false, features = ["util"] }
tower = { version = "0.4", default-features = false, features = ["util"] }
tower-test = "0.3"
linkerd2-app-test = { path = "../test" }
4 changes: 2 additions & 2 deletions linkerd/app/gateway/src/lib.rs
Expand Up @@ -19,6 +19,7 @@ mod test {
Error, NameAddr, NameMatch, Never,
};
use linkerd2_app_inbound::endpoint as inbound;
use linkerd2_app_test as support;
use std::{net::SocketAddr, str::FromStr};
use tower::util::{service_fn, ServiceExt};
use tower_test::mock;
Expand Down Expand Up @@ -133,11 +134,10 @@ mod test {
>();
let mut make_gateway = {
let profiles = service_fn(move |na: NameAddr| async move {
let (mut tx, rx) = tokio::sync::watch::channel(profiles::Profile {
let rx = support::profile::only(profiles::Profile {
name: Some(na.name().clone()),
..profiles::Profile::default()
});
tokio::spawn(async move { tx.closed().await });
Ok::<_, Never>(Some(rx))
});
let allow_discovery = NameMatch::new(Some(dns::Suffix::from_str(suffix).unwrap()));
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/inbound/Cargo.toml
Expand Up @@ -9,16 +9,16 @@ Configures and runs the inbound proxy
"""

[dependencies]
bytes = "0.5"
bytes = "0.6"
http = "0.2"
futures = { version = "0.3" }
indexmap = "1.0"
linkerd2-app-core = { path = "../core" }
tokio = { version = "0.2", features = ["sync"] }
tracing = "0.1.19"
tokio = { version = "0.3", features = ["sync"] }
tracing = "0.1.22"

[dependencies.tower]
version = "0.3"
version = "0.4"
# disable tower's tracing `log` integration for performance reasons, since we
# will consume tower's traces as traces.
default-features = false
Expand Down
20 changes: 10 additions & 10 deletions linkerd/app/integration/Cargo.toml
Expand Up @@ -18,25 +18,25 @@ flaky_tests = []
nyi = []

[dependencies]
bytes = "0.5"
futures = "0.3"
h2 = "0.2.6"
bytes = "0.6"
futures = "0.3"
h2 = "0.3"
http = "0.2"
http-body = "0.3"
hyper = "0.13.7"
http-body = "0.4"
hyper = "0.14.0-dev"
linkerd2-app = { path = "..", features = ["mock-orig-dst"] }
linkerd2-app-core = { path = "../core", features = ["mock-orig-dst"] }
linkerd2-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15", features = ["arbitrary"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "259628840ba613c2e5673fc6a39b946e1b06f09a", features = ["arbitrary"] }
linkerd2-app-test = { path = "../test" }
regex = "0.1"
socket2 = "0.3.12"
rustls = "0.18"
tokio = { version = "0.2", features = ["io-util", "net", "rt-core"]}
tokio-rustls = "0.14.1"
tower = { version = "0.3", default-features = false}
tokio = { version = "0.3", features = ["io-util", "net", "rt", "macros"]}
tokio-rustls = "0.20"
tower = { version = "0.4", default-features = false}
tonic = { version = "0.3", default-features = false }
tracing = "0.1.19"
tracing = "0.1.22"
tracing-futures = { version = "0.2", features = ["std-future"] }
webpki = "0.21"

Expand Down
3 changes: 2 additions & 1 deletion linkerd/app/integration/src/client.rs
@@ -1,4 +1,5 @@
use super::*;
use hyper::body::Buf;
use linkerd2_app_core::proxy::http::trace;
use rustls::ClientConfig;
use std::io;
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Client {
);
let stream = res.into_parts().1;
let mut body = hyper::body::aggregate(stream).await.expect("wait body");
std::str::from_utf8(body.to_bytes().as_ref())
std::str::from_utf8(body.copy_to_bytes(body.remaining()).as_ref())
.unwrap()
.to_string()
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/controller.rs
Expand Up @@ -362,7 +362,7 @@ where
let task = tokio::spawn(
cancelable(drain.clone(), async move {
// Start listening on the socket.
let mut listener = crate::listen(sock);
let listener = crate::listen(sock);
let mut listening_tx = Some(listening_tx);

if let Some(delay) = delay {
Expand Down
20 changes: 9 additions & 11 deletions linkerd/app/integration/src/lib.rs
Expand Up @@ -30,7 +30,7 @@ use std::pin::Pin;
pub use std::sync::Arc;
use std::task::{Context, Poll};
pub use std::time::Duration;
pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::TcpListener;
pub use tokio::stream::{Stream, StreamExt};
pub use tokio::sync::oneshot;
Expand Down Expand Up @@ -69,7 +69,7 @@ macro_rules! assert_eventually {
use std::{env, u64};
use std::time::{Instant, Duration};
use std::str::FromStr;
use tracing_futures::Instrument as _;
use tracing::Instrument as _;
// TODO: don't do this *every* time eventually is called (lazy_static?)
let patience = env::var($crate::ENV_TEST_PATIENCE_MS).ok()
.map(|s| {
Expand All @@ -96,7 +96,7 @@ macro_rules! assert_eventually {
)
} else {
tracing::trace!("waiting...");
tokio::time::delay_for(patience).await;
tokio::time::sleep(patience).await;
std::thread::yield_now();
tracing::trace!("done");
}
Expand Down Expand Up @@ -165,14 +165,10 @@ impl AsyncRead for RunningIo {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().io.as_mut().poll_read(cx, buf)
}

unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit<u8>]) -> bool {
self.io.prepare_uninitialized_buffer(buf)
}
}

impl AsyncWrite for RunningIo {
Expand Down Expand Up @@ -297,6 +293,8 @@ pub(crate) fn bind_ephemeral() -> (Socket, SocketAddr) {
pub(crate) fn listen(sock: Socket) -> TcpListener {
sock.listen(1024)
.expect("socket should be able to start listening");
TcpListener::from_std(sock.into_tcp_listener())
.expect("socket should be able to set nonblocking")
let sock = sock.into_tcp_listener();
sock.set_nonblocking(true)
.expect("socket should be able to set nonblocking");
TcpListener::from_std(sock).expect("socket should seem okay to tokio")
}
3 changes: 1 addition & 2 deletions linkerd/app/integration/src/proxy.rs
Expand Up @@ -313,9 +313,8 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
let span = info_span!("proxy", test = %thread_name());
let _enter = span.enter();

tokio::runtime::Builder::new()
tokio::runtime::Builder::new_current_thread()
.enable_all()
.basic_scheduler()
.build()
.expect("proxy")
.block_on(async move {
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/integration/src/server.rs
Expand Up @@ -169,7 +169,7 @@ impl Server {
self.route_async(path, move |_| {
let resp = resp.clone();
async move {
tokio::time::delay_for(latency).await;
tokio::time::sleep(latency).await;
Ok::<_, BoxError>(
http::Response::builder()
.status(200)
Expand Down Expand Up @@ -223,7 +223,7 @@ impl Server {
}

// After the delay, start listening on the socket.
let mut listener = crate::listen(sock);
let listener = crate::listen(sock);

if let Some(listening_tx) = listening_tx {
let _ = listening_tx.send(());
Expand Down
5 changes: 4 additions & 1 deletion linkerd/app/integration/src/tcp.rs
Expand Up @@ -212,7 +212,10 @@ async fn run_server(tcp: TcpServer) -> server::Listening {
let task = tokio::spawn(
cancelable(drain.clone(), async move {
let mut accepts = tcp.accepts;
let mut listener = TcpListener::from_std(std_listener).expect("TcpListener::from_std");
std_listener
.set_nonblocking(true)
.expect("socket must be able to set nonblocking");
let listener = TcpListener::from_std(std_listener).expect("TcpListener::from_std");

let _ = started_tx.send(());
loop {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/app/integration/src/tests/discovery.rs
Expand Up @@ -213,12 +213,13 @@ macro_rules! generate_tests {
.await
}

#[tokio::test]
#[tokio::test(flavor = "current_thread")]
async fn outbound_destinations_reset_on_reconnect_followed_by_dne() {
outbound_destinations_reset_on_reconnect(controller::destination_does_not_exist()).await
}

async fn outbound_destinations_reset_on_reconnect(up: pb::destination::Update) {
let _trace = trace_init();
let env = TestEnv::new();
let srv = $make_server().route("/", "hello").run().await;
let ctrl = controller::new();
Expand Down Expand Up @@ -250,7 +251,7 @@ macro_rules! generate_tests {
dst_tx1.send(up);

// Wait for the reconnect to happen. TODO: Replace this flaky logic.
tokio::time::delay_for(Duration::from_millis(1000)).await;
tokio::time::sleep(Duration::from_millis(1000)).await;

let rsp = initially_exists
.request(initially_exists.request_builder("/"))
Expand Down Expand Up @@ -320,12 +321,12 @@ macro_rules! generate_tests {
.await;

// Allow the control client to notice a connection error
tokio::time::delay_for(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;

// Allow our controller to start accepting connections,
// and then wait a little bit so the client tries again.
drop(tx);
tokio::time::delay_for(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;

let client = $make_client(proxy.outbound, "disco.test.svc.cluster.local");

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/identity.rs
Expand Up @@ -232,7 +232,7 @@ async fn refresh() {
let expiry = expiry_rx.await.expect("wait for expiry");
let how_long = expiry.duration_since(SystemTime::now()).unwrap();

tokio::time::delay_for(how_long).await;
tokio::time::sleep(how_long).await;

assert_eventually!(refreshed.load(Ordering::SeqCst) == true);
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/profile_dst_overrides.rs
Expand Up @@ -61,7 +61,7 @@ async fn wait_for_profile_stage(client: &client::Client, metrics: &client::Clien
break;
}

tokio::time::delay_for(std::time::Duration::from_millis(200)).await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/profiles.rs
Expand Up @@ -122,7 +122,7 @@ macro_rules! profile_test {
break;
}

tokio::time::delay_for(std::time::Duration::from_millis(200)).await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}

$with_client(client).await;
Expand Down

0 comments on commit c657b3e

Please sign in to comment.