Skip to content

Commit

Permalink
Refactor control::destination::background::client module (linkerd#38)
Browse files Browse the repository at this point in the history
This branch should not make any functional changes.

This branch makes two minor refactorings to the `client` module in 
`control::destination::background`:

 1. Remove the `AddOrigin` middleware and replace it with the 
    `tower-add-origin` crate from `tower-http`. These middlewares are
    functionally identical, but the Tower version has tests.
 2. Change `ClientService` from a type alias to a tuple struct. This
    means that some of the middleware that are used only in this module
    (`LogErrors` and `Backoff`) are no longer part of a publicly visible
    type and can be made private to the module.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Aug 4, 2018
1 parent ab1b280 commit 1774c87
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 55 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock
Expand Up @@ -506,6 +506,7 @@ dependencies = [
"tokio-rustls 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-signal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)",
"tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)",
Expand Down Expand Up @@ -1218,6 +1219,16 @@ dependencies = [
"tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "tower-add-origin"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower-http#c9e13f641a681b3ef01e96910789586e39aee2e2"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
]

[[package]]
name = "tower-balance"
version = "0.1.0"
Expand Down Expand Up @@ -1679,6 +1690,7 @@ dependencies = [
"checksum tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c3873a6d8d0b636e024e77b9a82eaab6739578a06189ecd0e731c7308fbc5d"
"checksum tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "028b94314065b90f026a21826cffd62a4e40a92cda3e5c069cc7b02e5945f5e9"
"checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a"
"checksum tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)" = "<none>"
"checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -47,6 +47,7 @@ tokio = "0.1.7"
tokio-signal = "0.2"
tokio-timer = "0.2.4" # for tokio_timer::clock
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tower-add-origin = { git = "https://github.com/tower-rs/tower-http" }
tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
Expand Down
95 changes: 40 additions & 55 deletions src/control/destination/background/client.rs
Expand Up @@ -10,17 +10,22 @@ use h2;
use http;
use tokio::timer::Delay;

use tower_h2::{self, BoxBody};
use tower_h2::{self, BoxBody, RecvBody};
use tower_add_origin::AddOrigin;
use tower_service::Service;
use tower_reconnect::{Reconnect, Error as ReconnectError};
use tower_reconnect::{
Reconnect,
Error as ReconnectError,
ResponseFuture as ReconnectFuture,
};
use conditional::Conditional;
use dns;
use timeout::{Timeout, TimeoutError};
use transport::{tls, HostAndPort, LookupAddressAndConnect};
use watch_service::Rebind;

/// Type of the client service stack used to make destination requests.
pub(super) type ClientService = AddOrigin<Backoff<LogErrors<Reconnect<
pub(super) struct ClientService(AddOrigin<Backoff<LogErrors<Reconnect<
tower_h2::client::Connect<
Timeout<LookupAddressAndConnect>,
::logging::ContextualExecutor<
Expand All @@ -31,7 +36,7 @@ pub(super) type ClientService = AddOrigin<Backoff<LogErrors<Reconnect<
>,
BoxBody,
>
>>>>;
>>>>);

/// The state needed to bind a new controller client stack.
pub(super) struct BindClient {
Expand All @@ -44,23 +49,15 @@ pub(super) struct BindClient {

/// Wait a duration if inner `poll_ready` returns an error.
//TODO: move to tower-backoff
pub(super) struct Backoff<S> {
struct Backoff<S> {
inner: S,
timer: Delay,
waiting: bool,
wait_dur: Duration,
}


/// Wraps an HTTP service, injecting authority and scheme on every request.
pub(super) struct AddOrigin<S> {
authority: http::uri::Authority,
inner: S,
scheme: http::uri::Scheme,
}

/// Log errors talking to the controller in human format.
pub(super) struct LogErrors<S> {
struct LogErrors<S> {
inner: S,
}

Expand All @@ -80,6 +77,31 @@ type LogError = ReconnectError<
>
>;

// ===== impl ClientService =====

impl Service for ClientService {
type Request = http::Request<BoxBody>;
type Response = http::Response<RecvBody>;
type Error = LogError;
type Future = ReconnectFuture<
tower_h2::client::Connect<
Timeout<LookupAddressAndConnect>,
::logging::ContextualExecutor<
::logging::Client<
&'static str,
HostAndPort,
>
>, BoxBody>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}

fn call(&mut self, request: Self::Request) -> Self::Future {
self.0.call(request)
}
}

// ===== impl BindClient =====

impl BindClient {
Expand Down Expand Up @@ -133,8 +155,7 @@ impl Rebind<tls::ConditionalClientConfig> for BindClient {
let reconnect = Reconnect::new(h2_client);
let log_errors = LogErrors::new(reconnect);
let backoff = Backoff::new(log_errors, self.backoff_delay);
// TODO: Use AddOrigin in tower-http
AddOrigin::new(scheme, authority, backoff)
ClientService(AddOrigin::new(backoff, scheme, authority))
}

}
Expand All @@ -145,7 +166,7 @@ impl<S> Backoff<S>
where
S: Service,
{
pub(super) fn new(inner: S, wait_dur: Duration) -> Self {
fn new(inner: S, wait_dur: Duration) -> Self {
Backoff {
inner,
timer: Delay::new(Instant::now() + wait_dur),
Expand Down Expand Up @@ -207,7 +228,7 @@ impl<S> LogErrors<S>
where
S: Service<Error=LogError>,
{
pub(super) fn new(service: S) -> Self {
fn new(service: S) -> Self {
LogErrors {
inner: service,
}
Expand Down Expand Up @@ -235,7 +256,7 @@ where
}
}

pub(super) struct HumanError<'a>(&'a LogError);
struct HumanError<'a>(&'a LogError);

impl<'a> fmt::Display for HumanError<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand All @@ -256,42 +277,6 @@ impl<'a> fmt::Display for HumanError<'a> {
}
}

// ===== impl AddOrigin =====

impl<S> AddOrigin<S> {
pub(super) fn new(scheme: http::uri::Scheme, auth: http::uri::Authority, service: S) -> Self {
AddOrigin {
authority: auth,
inner: service,
scheme,
}
}
}

impl<S, B> Service for AddOrigin<S>
where
S: Service<Request = http::Request<B>>,
{
type Request = http::Request<B>;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}

fn call(&mut self, req: Self::Request) -> Self::Future {
let (mut head, body) = req.into_parts();
let mut uri: http::uri::Parts = head.uri.into();
uri.scheme = Some(self.scheme.clone());
uri.authority = Some(self.authority.clone());
head.uri = http::Uri::from_parts(uri).expect("valid uri");

self.inner.call(http::Request::from_parts(head, body))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -38,6 +38,7 @@ extern crate tempdir;
extern crate tokio;
extern crate tokio_connect;
extern crate tokio_timer;
extern crate tower_add_origin;
extern crate tower_balance;
extern crate tower_buffer;
extern crate tower_discover;
Expand Down

0 comments on commit 1774c87

Please sign in to comment.