From 8ae72723cf83a68cfe2d5393ff09f3532afdda4e Mon Sep 17 00:00:00 2001 From: Panther0087 Date: Tue, 17 Sep 2019 13:11:42 -0700 Subject: [PATCH] Split service discovery into composable components (#341) The `linkerd2_proxy_resolve` crate, which was split out of the main application in #318, contains a monolithic `Resolve` implementation. Over time, this code has evolved in ways that have made it difficult to modify and extend, especially and functionality cannot be tested independently. This change reorganizes the proxy's service discovery logic into a few composable components. The `Resolve` API has been updated to facilitate this. Service discovery consists of two primary traits: `Resolve` and `Discover`. `Resolve` is used within the proxy to model discovery sources like the Destination API and DNS; and `Discover` updates a load balancer with changes to its replica pool. The former `Resolve` trait has been replaced with a trait alias for `Service`, which allows for back-pressure, etc. Furthermore, the API has been updated so that it is easier to gracefully recover after a resolution stream is lost (by changing `Update` to contain a `Vec`). Now that `Resolve` is implemented as a `Service`, it becomes straightforward to model service discovery in terms of a stack of services, as we do throughout the project: * The `linkerd2-proxy-api-resolve` crate provides an implementation of `Resolve` backed by the destination API. It does NOT handle reconnects, buffering/spawning, etc. * The `linkerd2-request-filter` crate provides a simple middleware that can fail requests based on the request. This is used to implement the destination client's name/suffix restrictions. * The `linkerd2-proxy-resolve` crate provides `Resolve`-middlewares that implement error recovery/backoff and endpoint-type-mapping. * The `linkerd2-proxy-discover crate provides a suite of middlewares that facilitate powering a balancer with a `Resolve`. Specifically: * `buffer` ensures that discovery updates are processed even when the balancer is not polling for updates. * `from_resolve` wraps a `Resolve` to produce `Discover` response. * `make_endpoint` wraps a `Discover`-producing service to build each inserted endpoint into a Service. Almost all of this logic was happening in a single module, coupled to our gRPC interface. Now, these components may be easily re-purposed (for instance, in order to back discovery by DNS). --- Cargo.lock | 89 ++-- Cargo.toml | 39 +- lib/linkerd2-proxy-api-resolve/Cargo.toml | 18 + lib/linkerd2-proxy-api-resolve/src/lib.rs | 10 + .../src/metadata.rs | 76 +++ lib/linkerd2-proxy-api-resolve/src/pb.rs | 109 ++++ lib/linkerd2-proxy-api-resolve/src/resolve.rs | 142 +++++ lib/linkerd2-proxy-core/Cargo.toml | 1 + lib/linkerd2-proxy-core/src/resolve.rs | 78 ++- lib/linkerd2-proxy-discover/Cargo.toml | 20 + lib/linkerd2-proxy-discover/src/buffer.rs | 155 ++++++ .../src/from_resolve.rs | 118 +++++ lib/linkerd2-proxy-discover/src/lib.rs | 58 +++ .../src/make_endpoint.rs | 341 ++++++------ lib/linkerd2-proxy-resolve/Cargo.toml | 9 +- .../src/destination/client.rs | 95 ---- .../src/destination/mod.rs | 163 ------ .../src/destination/resolution.rs | 443 ---------------- lib/linkerd2-proxy-resolve/src/lib.rs | 14 +- .../src/map_endpoint.rs | 136 +++++ lib/linkerd2-proxy-resolve/src/recover.rs | 489 ++++++++++++++++++ .../src/remote_stream.rs | 67 --- lib/linkerd2-request-filter/Cargo.toml | 11 + lib/linkerd2-request-filter/src/lib.rs | 83 +++ src/app/main.rs | 14 +- src/app/outbound/discovery.rs | 143 ----- src/app/outbound/endpoint.rs | 31 +- src/app/outbound/mod.rs | 32 +- src/app/outbound/resolve.rs | 112 ++++ src/lib.rs | 2 +- src/proxy/http/profiles/router.rs | 40 +- src/proxy/mod.rs | 1 - src/svc.rs | 11 +- 33 files changed, 1944 insertions(+), 1206 deletions(-) create mode 100644 lib/linkerd2-proxy-api-resolve/Cargo.toml create mode 100644 lib/linkerd2-proxy-api-resolve/src/lib.rs create mode 100644 lib/linkerd2-proxy-api-resolve/src/metadata.rs create mode 100644 lib/linkerd2-proxy-api-resolve/src/pb.rs create mode 100644 lib/linkerd2-proxy-api-resolve/src/resolve.rs create mode 100644 lib/linkerd2-proxy-discover/Cargo.toml create mode 100644 lib/linkerd2-proxy-discover/src/buffer.rs create mode 100644 lib/linkerd2-proxy-discover/src/from_resolve.rs create mode 100644 lib/linkerd2-proxy-discover/src/lib.rs rename src/proxy/resolve.rs => lib/linkerd2-proxy-discover/src/make_endpoint.rs (61%) delete mode 100644 lib/linkerd2-proxy-resolve/src/destination/client.rs delete mode 100644 lib/linkerd2-proxy-resolve/src/destination/mod.rs delete mode 100644 lib/linkerd2-proxy-resolve/src/destination/resolution.rs create mode 100644 lib/linkerd2-proxy-resolve/src/map_endpoint.rs create mode 100644 lib/linkerd2-proxy-resolve/src/recover.rs delete mode 100644 lib/linkerd2-proxy-resolve/src/remote_stream.rs create mode 100644 lib/linkerd2-request-filter/Cargo.toml create mode 100644 lib/linkerd2-request-filter/src/lib.rs delete mode 100644 src/app/outbound/discovery.rs create mode 100644 src/app/outbound/resolve.rs diff --git a/Cargo.lock b/Cargo.lock index 773a31b..d8fce51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,11 +314,6 @@ dependencies = [ "unicode-segmentation 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "hex" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "hostname" version = "0.1.4" @@ -604,15 +599,17 @@ dependencies = [ "linkerd2-identity 0.1.0", "linkerd2-metrics 0.1.0", "linkerd2-proxy-api 0.1.8 (git+https://github.com/linkerd/linkerd2-proxy-api?rev=ddbc3a4f7f8b0058801f896d27974d19ee98094c)", + "linkerd2-proxy-api-resolve 0.1.0", "linkerd2-proxy-core 0.1.0", + "linkerd2-proxy-discover 0.1.0", "linkerd2-proxy-resolve 0.1.0", "linkerd2-reconnect 0.1.0", + "linkerd2-request-filter 0.1.0", "linkerd2-router 0.1.0", "linkerd2-signal 0.1.0", "linkerd2-stack 0.1.0", "linkerd2-task 0.1.0", "linkerd2-timeout 0.1.0", - "linkerd2-trace-context 0.1.0", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", "procinfo 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -665,6 +662,22 @@ dependencies = [ "tower-grpc-build 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", ] +[[package]] +name = "linkerd2-proxy-api-resolve" +version = "0.1.0" +dependencies = [ + "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "linkerd2-identity 0.1.0", + "linkerd2-proxy-api 0.1.8 (git+https://github.com/linkerd/linkerd2-proxy-api?rev=ddbc3a4f7f8b0058801f896d27974d19ee98094c)", + "linkerd2-proxy-core 0.1.0", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", + "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "linkerd2-proxy-core" version = "0.1.0" @@ -672,27 +685,39 @@ dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "linkerd2-drain 0.1.0", "linkerd2-error 0.1.0", + "tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] -name = "linkerd2-proxy-resolve" +name = "linkerd2-proxy-discover" version = "0.1.0" dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", - "linkerd2-addr 0.1.0", - "linkerd2-dns-name 0.1.0", "linkerd2-error 0.1.0", - "linkerd2-identity 0.1.0", - "linkerd2-proxy-api 0.1.8 (git+https://github.com/linkerd/linkerd2-proxy-api?rev=ddbc3a4f7f8b0058801f896d27974d19ee98094c)", "linkerd2-proxy-core 0.1.0", "linkerd2-task 0.1.0", - "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", + "tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", + "tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-futures 0.0.1-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "linkerd2-proxy-resolve" +version = "0.1.0" +dependencies = [ + "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "linkerd2-error 0.1.0", + "linkerd2-proxy-core 0.1.0", + "linkerd2-task 0.1.0", + "tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", + "tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "linkerd2-reconnect" version = "0.1.0" @@ -704,6 +729,15 @@ dependencies = [ "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "linkerd2-request-filter" +version = "0.1.0" +dependencies = [ + "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", + "tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "linkerd2-router" version = "0.1.0" @@ -764,22 +798,6 @@ dependencies = [ "tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "linkerd2-trace-context" -version = "0.1.0" -dependencies = [ - "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", - "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", - "linkerd2-error 0.1.0", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", - "tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "lock_api" version = "0.1.5" @@ -1676,12 +1694,12 @@ dependencies = [ [[package]] name = "tower-balance" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#793e2e8e94b5ca6c1d12171b3909d78505ab6667" +source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15" dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "tower-discover 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1689,7 +1707,6 @@ dependencies = [ "tower-load 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1765,7 +1782,7 @@ dependencies = [ [[package]] name = "tower-load" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#793e2e8e94b5ca6c1d12171b3909d78505ab6667" +source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15" dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1816,7 +1833,7 @@ dependencies = [ [[package]] name = "tower-spawn-ready" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#793e2e8e94b5ca6c1d12171b3909d78505ab6667" +source = "git+https://github.com/tower-rs/tower#b39a4881d8eff6cbbb3a49b95db8492f6f8afb15" dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1854,7 +1871,6 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "spin 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-attributes 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-core 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1901,7 +1917,7 @@ dependencies = [ [[package]] name = "tracing-subscriber" version = "0.1.1" -source = "git+https://github.com/tokio-rs/tracing#49dab30847823a10f4398595616c19c0ee96d737" +source = "git+https://github.com/tokio-rs/tracing#198e62a613e1fcf623e8f2c66e1192504f5e9b2f" dependencies = [ "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2240,7 +2256,6 @@ dependencies = [ "checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40" "checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" -"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" "checksum hostname 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "58fab6e177434b0bb4cd344a4dabaa5bd6d7a8d792b1885aebcae7af1091d1cb" "checksum http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "fe67e3678f2827030e89cc4b9e7ecd16d52f132c0b940ab5005f88e821500f6a" "checksum http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d" diff --git a/Cargo.toml b/Cargo.toml index 64bcebd..3156353 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,11 @@ members = [ "lib/linkerd2-identity", "lib/linkerd2-metrics", "lib/linkerd2-opencensus", + "lib/linkerd2-proxy-api-resolve", "lib/linkerd2-proxy-core", + "lib/linkerd2-proxy-discover", "lib/linkerd2-proxy-resolve", + "lib/linkerd2-request-filter", "lib/linkerd2-reconnect", "lib/linkerd2-router", "lib/linkerd2-signal", @@ -37,23 +40,25 @@ flaky_tests = [] [dependencies] hyper-balance = { path = "lib/hyper-balance" } -linkerd2-addr = { path = "lib/linkerd2-addr" } -linkerd2-conditional = { path = "lib/linkerd2-conditional" } -linkerd2-dns-name = { path = "lib/linkerd2-dns-name" } -linkerd2-error = { path = "lib/linkerd2-error" } -linkerd2-fallback = { path = "lib/linkerd2-fallback" } -linkerd2-identity = { path = "lib/linkerd2-identity" } -linkerd2-metrics = { path = "lib/linkerd2-metrics" } -linkerd2-proxy-core = { path = "lib/linkerd2-proxy-core" } -linkerd2-exp-backoff = { path = "lib/linkerd2-exp-backoff" } -linkerd2-proxy-resolve = { path = "lib/linkerd2-proxy-resolve" } -linkerd2-reconnect = { path = "lib/linkerd2-reconnect" } -linkerd2-router = { path = "lib/linkerd2-router" } -linkerd2-signal = { path = "lib/linkerd2-signal" } -linkerd2-stack = { path = "lib/linkerd2-stack" } -linkerd2-task = { path = "lib/linkerd2-task" } -linkerd2-timeout = { path = "lib/linkerd2-timeout" } -linkerd2-trace-context = { path = "lib/linkerd2-trace-context" } +linkerd2-addr = { path = "lib/linkerd2-addr" } +linkerd2-conditional = { path = "lib/linkerd2-conditional" } +linkerd2-dns-name = { path = "lib/linkerd2-dns-name" } +linkerd2-error = { path = "lib/linkerd2-error" } +linkerd2-fallback = { path = "lib/linkerd2-fallback" } +linkerd2-identity = { path = "lib/linkerd2-identity" } +linkerd2-metrics = { path = "lib/linkerd2-metrics" } +linkerd2-exp-backoff = { path = "lib/linkerd2-exp-backoff" } +linkerd2-proxy-core = { path = "lib/linkerd2-proxy-core" } +linkerd2-proxy-api-resolve = { path = "lib/linkerd2-proxy-api-resolve" } +linkerd2-proxy-discover = { path = "lib/linkerd2-proxy-discover" } +linkerd2-proxy-resolve = { path = "lib/linkerd2-proxy-resolve" } +linkerd2-reconnect = { path = "lib/linkerd2-reconnect" } +linkerd2-request-filter = { path = "lib/linkerd2-request-filter" } +linkerd2-router = { path = "lib/linkerd2-router" } +linkerd2-signal = { path = "lib/linkerd2-signal" } +linkerd2-stack = { path = "lib/linkerd2-stack" } +linkerd2-task = { path = "lib/linkerd2-task" } +linkerd2-timeout = { path = "lib/linkerd2-timeout" } linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "ddbc3a4f7f8b0058801f896d27974d19ee98094c" } diff --git a/lib/linkerd2-proxy-api-resolve/Cargo.toml b/lib/linkerd2-proxy-api-resolve/Cargo.toml new file mode 100644 index 0000000..8609b71 --- /dev/null +++ b/lib/linkerd2-proxy-api-resolve/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "linkerd2-proxy-api-resolve" +version = "0.1.0" +authors = ["Linkerd Developers "] +edition = "2018" +publish = false + +[dependencies] +futures = "0.1" +linkerd2-identity = { path = "../linkerd2-identity" } +linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "ddbc3a4f7f8b0058801f896d27974d19ee98094c" } +linkerd2-proxy-core = { path = "../linkerd2-proxy-core" } +prost = "0.5.0" +tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] } +indexmap = "1.0" +tokio-sync = "0.1" +tower = "0.1" +tracing = "0.1" diff --git a/lib/linkerd2-proxy-api-resolve/src/lib.rs b/lib/linkerd2-proxy-api-resolve/src/lib.rs new file mode 100644 index 0000000..cd7caf4 --- /dev/null +++ b/lib/linkerd2-proxy-api-resolve/src/lib.rs @@ -0,0 +1,10 @@ +use linkerd2_identity as identity; +use linkerd2_proxy_api as api; +use linkerd2_proxy_core as core; + +mod metadata; +mod pb; +mod resolve; + +pub use self::metadata::{Metadata, ProtocolHint}; +pub use self::resolve::Resolve; diff --git a/lib/linkerd2-proxy-api-resolve/src/metadata.rs b/lib/linkerd2-proxy-api-resolve/src/metadata.rs new file mode 100644 index 0000000..bdc6fd8 --- /dev/null +++ b/lib/linkerd2-proxy-api-resolve/src/metadata.rs @@ -0,0 +1,76 @@ +use crate::identity; +use indexmap::IndexMap; + +/// Metadata describing an endpoint. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Metadata { + /// An endpoint's relative weight. + /// + /// A weight of 0 means that the endpoint should never be preferred over a + /// non 0-weighted endpoint. + /// + /// The default weight, corresponding to 1.0, is 10,000. This enables us to + /// specify weights as small as 0.0001 and as large as 400,000+. + /// + /// A float is not used so that this type can implement `Eq`. + weight: u32, + + /// Arbitrary endpoint labels. Primarily used for telemetry. + labels: IndexMap, + + /// A hint from the controller about what protocol (HTTP1, HTTP2, etc) the + /// destination understands. + protocol_hint: ProtocolHint, + + /// How to verify TLS for the endpoint. + identity: Option, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ProtocolHint { + /// We don't what the destination understands, so forward messages in the + /// protocol we received them in. + Unknown, + /// The destination can receive HTTP2 messages. + Http2, +} + +// === impl Metadata === + +impl Metadata { + pub fn empty() -> Self { + Self { + labels: IndexMap::default(), + protocol_hint: ProtocolHint::Unknown, + identity: None, + weight: 10_000, + } + } + + pub fn new( + labels: IndexMap, + protocol_hint: ProtocolHint, + identity: Option, + weight: u32, + ) -> Self { + Self { + labels, + protocol_hint, + identity, + weight, + } + } + + /// Returns the endpoint's labels from the destination service, if it has them. + pub fn labels(&self) -> &IndexMap { + &self.labels + } + + pub fn protocol_hint(&self) -> ProtocolHint { + self.protocol_hint + } + + pub fn identity(&self) -> Option<&identity::Name> { + self.identity.as_ref() + } +} diff --git a/lib/linkerd2-proxy-api-resolve/src/pb.rs b/lib/linkerd2-proxy-api-resolve/src/pb.rs new file mode 100644 index 0000000..386b5d1 --- /dev/null +++ b/lib/linkerd2-proxy-api-resolve/src/pb.rs @@ -0,0 +1,109 @@ +use crate::api::destination::{protocol_hint::Protocol, TlsIdentity, WeightedAddr}; +use crate::api::net::TcpAddress; +use crate::identity; +use crate::metadata::{Metadata, ProtocolHint}; +use indexmap::IndexMap; +use std::{collections::HashMap, net::SocketAddr}; + +/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. +pub(in crate) fn to_addr_meta( + pb: WeightedAddr, + set_labels: &HashMap, +) -> Option<(SocketAddr, Metadata)> { + let addr = pb.addr.and_then(to_sock_addr)?; + + let meta = { + let mut t = set_labels + .iter() + .chain(pb.metric_labels.iter()) + .collect::>(); + t.sort_by(|(k0, _), (k1, _)| k0.cmp(k1)); + + let mut m = IndexMap::with_capacity(t.len()); + for (k, v) in t.into_iter() { + m.insert(k.clone(), v.clone()); + } + + m + }; + + let mut proto_hint = ProtocolHint::Unknown; + if let Some(hint) = pb.protocol_hint { + if let Some(proto) = hint.protocol { + match proto { + Protocol::H2(..) => { + proto_hint = ProtocolHint::Http2; + } + } + } + } + + let tls_id = pb.tls_identity.and_then(to_id); + let meta = Metadata::new(meta, proto_hint, tls_id, pb.weight); + Some((addr, meta)) +} + +fn to_id(pb: TlsIdentity) -> Option { + use crate::api::destination::tls_identity::Strategy; + + let Strategy::DnsLikeIdentity(i) = pb.strategy?; + match identity::Name::from_hostname(i.name.as_bytes()) { + Ok(i) => Some(i), + Err(_) => { + tracing::warn!("Ignoring invalid identity: {}", i.name); + None + } + } +} + +pub(in crate) fn to_sock_addr(pb: TcpAddress) -> Option { + use crate::api::net::ip_address::Ip; + use std::net::{Ipv4Addr, Ipv6Addr}; + /* + current structure is: + TcpAddress { + ip: Option, + }>, + port: u32, + } + */ + match pb.ip { + Some(ip) => match ip.ip { + Some(Ip::Ipv4(octets)) => { + let ipv4 = Ipv4Addr::from(octets); + Some(SocketAddr::from((ipv4, pb.port as u16))) + } + Some(Ip::Ipv6(v6)) => { + let octets = [ + (v6.first >> 56) as u8, + (v6.first >> 48) as u8, + (v6.first >> 40) as u8, + (v6.first >> 32) as u8, + (v6.first >> 24) as u8, + (v6.first >> 16) as u8, + (v6.first >> 8) as u8, + v6.first as u8, + (v6.last >> 56) as u8, + (v6.last >> 48) as u8, + (v6.last >> 40) as u8, + (v6.last >> 32) as u8, + (v6.last >> 24) as u8, + (v6.last >> 16) as u8, + (v6.last >> 8) as u8, + v6.last as u8, + ]; + let ipv6 = Ipv6Addr::from(octets); + Some(SocketAddr::from((ipv6, pb.port as u16))) + } + None => None, + }, + None => None, + } +} diff --git a/lib/linkerd2-proxy-api-resolve/src/resolve.rs b/lib/linkerd2-proxy-api-resolve/src/resolve.rs new file mode 100644 index 0000000..7cbb336 --- /dev/null +++ b/lib/linkerd2-proxy-api-resolve/src/resolve.rs @@ -0,0 +1,142 @@ +use crate::api::destination as api; +use crate::core::resolve::{self, Update}; +use crate::metadata::Metadata; +use crate::pb; +use futures::{future, try_ready, Future, Poll, Stream}; +use tower::Service; +use tower_grpc::{self as grpc, generic::client::GrpcService, Body, BoxBody}; +use tracing::{debug, trace}; + +#[derive(Clone)] +pub struct Resolve { + service: api::client::Destination, + scheme: String, + context_token: String, +} + +pub struct Resolution> { + inner: grpc::Streaming, +} + +// === impl Resolver === + +impl Resolve +where + S: GrpcService + Clone + Send + 'static, + S::ResponseBody: Send, + ::Data: Send, + S::Future: Send, +{ + pub fn new(svc: S) -> Self { + Self { + service: api::client::Destination::new(svc), + scheme: "".into(), + context_token: "".into(), + } + } + + pub fn with_scheme(self, scheme: T) -> Self { + Self { + scheme: scheme.to_string(), + ..self + } + } + + pub fn with_context_token(self, context_token: T) -> Self { + Self { + context_token: context_token.to_string(), + ..self + } + } +} + +impl Service for Resolve +where + T: ToString, + S: GrpcService + Clone + Send + 'static, + S::ResponseBody: Send, + ::Data: Send, + S::Future: Send, +{ + type Response = Resolution; + type Error = grpc::Status; + type Future = future::Map< + grpc::client::server_streaming::ResponseFuture, + fn(grpc::Response>) -> Resolution, + >; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready() + } + + fn call(&mut self, target: T) -> Self::Future { + let path = target.to_string(); + trace!("resolve {:?}", path); + self.service + .get(grpc::Request::new(api::GetDestination { + path, + scheme: self.scheme.clone(), + context_token: self.context_token.clone(), + })) + .map(|rsp| { + debug!(metadata = ?rsp.metadata()); + Resolution { + inner: rsp.into_inner(), + } + }) + } +} + +// === impl ResolveFuture === + +impl resolve::Resolution for Resolution +where + S: GrpcService, +{ + type Endpoint = Metadata; + type Error = grpc::Status; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + match try_ready!(self.inner.poll()) { + Some(api::Update { update }) => match update { + Some(api::update::Update::Add(api::WeightedAddrSet { + addrs, + metric_labels, + })) => { + let addr_metas = addrs + .into_iter() + .filter_map(|addr| pb::to_addr_meta(addr, &metric_labels)) + .collect::>(); + if !addr_metas.is_empty() { + return Ok(Update::Add(addr_metas).into()); + } + } + + Some(api::update::Update::Remove(api::AddrSet { addrs })) => { + let sock_addrs = addrs + .into_iter() + .filter_map(pb::to_sock_addr) + .collect::>(); + if !sock_addrs.is_empty() { + return Ok(Update::Remove(sock_addrs).into()); + } + } + + Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => { + let update = if exists { + Update::Empty + } else { + Update::DoesNotExist + }; + return Ok(update.into()); + } + + None => {} // continue + }, + + None => return Err(grpc::Status::new(grpc::Code::Ok, "end of stream")), + }; + } + } +} diff --git a/lib/linkerd2-proxy-core/Cargo.toml b/lib/linkerd2-proxy-core/Cargo.toml index 9b50373..477a51f 100644 --- a/lib/linkerd2-proxy-core/Cargo.toml +++ b/lib/linkerd2-proxy-core/Cargo.toml @@ -9,3 +9,4 @@ publish = false futures = "0.1" linkerd2-drain = { path = "../linkerd2-drain" } linkerd2-error = { path = "../linkerd2-error" } +tower = "0.1" diff --git a/lib/linkerd2-proxy-core/src/resolve.rs b/lib/linkerd2-proxy-core/src/resolve.rs index cab88ff..8401d16 100644 --- a/lib/linkerd2-proxy-core/src/resolve.rs +++ b/lib/linkerd2-proxy-core/src/resolve.rs @@ -1,31 +1,87 @@ use futures::{Future, Poll}; +use linkerd2_error::Error; use std::net::SocketAddr; /// Resolves `T`-typed names/addresses as a `Resolution`. pub trait Resolve { type Endpoint; - type Future: Future; + type Error: Into; type Resolution: Resolution; + type Future: Future; - /// Asynchronously returns a `Resolution` for the given `target`. - /// - /// The returned future will complete with a `Resolution` if this resolver - /// was able to successfully resolve `target`. Otherwise, if it completes - /// with an error, that name or address should not be resolved by this - /// resolver. - fn resolve(&self, target: &T) -> Self::Future; + fn poll_ready(&mut self) -> Poll<(), Self::Error>; + + fn resolve(&mut self, target: T) -> Self::Future; + + fn into_service(self) -> Service + where + Self: Sized, + { + Service(self) + } } /// An infinite stream of endpoint updates. pub trait Resolution { type Endpoint; - type Error; + type Error: Into; fn poll(&mut self) -> Poll, Self::Error>; } #[derive(Clone, Debug)] +pub struct Service(S); + +#[derive(Clone, Debug, PartialEq)] pub enum Update { - Add(SocketAddr, T), - Remove(SocketAddr), + Add(Vec<(SocketAddr, T)>), + Remove(Vec), + Empty, + DoesNotExist, +} + +// === impl Resolve === + +impl Resolve for S +where + S: tower::Service, + S::Error: Into, + R: Resolution, +{ + type Endpoint = ::Endpoint; + type Error = S::Error; + type Resolution = S::Response; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + tower::Service::poll_ready(self) + } + + #[inline] + fn resolve(&mut self, target: T) -> Self::Future { + tower::Service::call(self, target) + } +} + +// === impl Service === + +impl tower::Service for Service +where + R: Resolve, + R::Error: Into, +{ + type Error = R::Error; + type Response = R::Resolution; + type Future = R::Future; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + #[inline] + fn call(&mut self, target: T) -> Self::Future { + self.0.resolve(target) + } } diff --git a/lib/linkerd2-proxy-discover/Cargo.toml b/lib/linkerd2-proxy-discover/Cargo.toml new file mode 100644 index 0000000..bfe7ea5 --- /dev/null +++ b/lib/linkerd2-proxy-discover/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "linkerd2-proxy-discover" +version = "0.1.0" +authors = ["Linkerd Developers "] +edition = "2018" +publish = false + +[dependencies] +futures = "0.1" +linkerd2-error = { path = "../linkerd2-error" } +linkerd2-proxy-core = { path = "../linkerd2-proxy-core" } +linkerd2-task = { path = "../linkerd2-task" } +indexmap = "1.0" +tokio = "0.1" +tower = "0.1" +tracing = "0.1" +tracing-futures = "0.0.1-alpha.1" + +[dev-dependencies] +tower-util = "0.1" diff --git a/lib/linkerd2-proxy-discover/src/buffer.rs b/lib/linkerd2-proxy-discover/src/buffer.rs new file mode 100644 index 0000000..5f1653f --- /dev/null +++ b/lib/linkerd2-proxy-discover/src/buffer.rs @@ -0,0 +1,155 @@ +use futures::{try_ready, Async, Future, Poll, Stream}; +use linkerd2_error::{Error, Never}; +use linkerd2_task as task; +use std::fmt; +use tokio::sync::{mpsc, oneshot}; +use tower::discover; +use tracing::info_span; +use tracing_futures::Instrument; + +#[derive(Clone, Debug)] +pub struct Buffer { + capacity: usize, + inner: M, +} + +#[derive(Debug)] +pub struct Discover { + rx: mpsc::Receiver>, + _disconnect_tx: oneshot::Sender, +} + +pub struct DiscoverFuture { + future: F, + target: String, + capacity: usize, + _marker: std::marker::PhantomData D>, +} + +pub struct Daemon { + discover: D, + disconnect_rx: oneshot::Receiver, + tx: mpsc::Sender>, +} + +#[derive(Clone, Debug)] +pub struct Lost(()); + +impl Buffer { + pub fn new(capacity: usize, inner: M) -> Self + where + Self: tower::Service, + { + Self { capacity, inner } + } +} + +impl tower::Service for Buffer +where + T: fmt::Display, + M: tower::Service, + D: discover::Discover + Send + 'static, + D::Error: Into, + D::Key: Send, + D::Service: Send, +{ + type Response = Discover; + type Error = M::Error; + type Future = DiscoverFuture; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, req: T) -> Self::Future { + let target = req.to_string().into(); + let future = self.inner.call(req); + Self::Future { + future, + target, + capacity: self.capacity, + _marker: std::marker::PhantomData, + } + } +} + +impl Future for DiscoverFuture +where + F: Future, + D: discover::Discover + Send + 'static, + D::Error: Into, + D::Key: Send, + D::Service: Send, +{ + type Item = Discover; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let discover = try_ready!(self.future.poll()); + + let (tx, rx) = mpsc::channel(self.capacity); + let (_disconnect_tx, disconnect_rx) = oneshot::channel(); + let fut = Daemon { + discover, + disconnect_rx, + tx, + }; + task::spawn(fut.instrument(info_span!("discover", target = %self.target))); + + Ok(Discover { rx, _disconnect_tx }.into()) + } +} + +impl Future for Daemon +where + D: discover::Discover, + D::Error: Into, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + match self.disconnect_rx.poll() { + Ok(Async::NotReady) => {} + Err(_lost) => return Ok(().into()), + Ok(Async::Ready(n)) => match n {}, + } + + try_ready!(self + .tx + .poll_ready() + .map_err(|_| tracing::trace!("lost sender"))); + + let up = try_ready!(self.discover.poll().map_err(|e| { + let e: Error = e.into(); + tracing::debug!("resoution lost: {}", e); + })); + + self.tx.try_send(up).ok().expect("sender must be ready"); + } + } +} + +impl tower::discover::Discover for Discover { + type Key = K; + type Service = S; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + return match self.rx.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(Some(change))) => Ok(Async::Ready(change)), + Err(_) | Ok(Async::Ready(None)) => Err(Lost(()).into()), + }; + } +} + +impl std::fmt::Display for Lost { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "discovery task failed") + } +} + +impl std::error::Error for Lost {} diff --git a/lib/linkerd2-proxy-discover/src/from_resolve.rs b/lib/linkerd2-proxy-discover/src/from_resolve.rs new file mode 100644 index 0000000..a6b863d --- /dev/null +++ b/lib/linkerd2-proxy-discover/src/from_resolve.rs @@ -0,0 +1,118 @@ +use futures::{try_ready, Async, Future, Poll}; +use indexmap::IndexSet; +use linkerd2_proxy_core::resolve::{Resolution, Resolve, Update}; +use std::collections::VecDeque; +use std::net::SocketAddr; +use tower::discover::Change; + +#[derive(Clone, Debug)] +pub struct FromResolve { + resolve: R, +} + +#[derive(Debug)] +pub struct DiscoverFuture { + future: F, +} + +/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to +/// build a service for each endpoint. +pub struct Discover { + resolution: R, + active: IndexSet, + pending: VecDeque>, +} + +// === impl FromResolve === + +impl FromResolve { + pub fn new(resolve: R) -> Self + where + R: Resolve, + { + Self { resolve } + } +} + +impl tower::Service for FromResolve +where + R: Resolve + Clone, +{ + type Response = Discover; + type Error = R::Error; + type Future = DiscoverFuture; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.resolve.poll_ready() + } + + #[inline] + fn call(&mut self, target: T) -> Self::Future { + Self::Future { + future: self.resolve.resolve(target), + } + } +} + +// === impl DiscoverFuture === + +impl Future for DiscoverFuture +where + F: Future, + F::Item: Resolution, +{ + type Item = Discover; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let resolution = try_ready!(self.future.poll()); + Ok(Async::Ready(Discover::new(resolution))) + } +} + +// === impl Discover === + +impl Discover { + pub fn new(resolution: R) -> Self { + Self { + resolution, + active: IndexSet::default(), + pending: VecDeque::new(), + } + } +} + +impl tower::discover::Discover for Discover { + type Key = SocketAddr; + type Service = R::Endpoint; + type Error = R::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if let Some(change) = self.pending.pop_front() { + return Ok(change.into()); + } + + match try_ready!(self.resolution.poll()) { + Update::Add(endpoints) => { + for (addr, endpoint) in endpoints.into_iter() { + self.active.insert(addr); + self.pending.push_back(Change::Insert(addr, endpoint)); + } + } + Update::Remove(addrs) => { + for addr in addrs.into_iter() { + if self.active.remove(&addr) { + self.pending.push_back(Change::Remove(addr)); + } + } + } + Update::DoesNotExist | Update::Empty => { + self.pending + .extend(self.active.drain(..).map(Change::Remove)); + } + } + } + } +} diff --git a/lib/linkerd2-proxy-discover/src/lib.rs b/lib/linkerd2-proxy-discover/src/lib.rs new file mode 100644 index 0000000..e06fcaf --- /dev/null +++ b/lib/linkerd2-proxy-discover/src/lib.rs @@ -0,0 +1,58 @@ +#![deny(warnings, rust_2018_idioms)] + +use linkerd2_error::Error; +use linkerd2_proxy_core::Resolve; +use std::fmt; + +pub mod buffer; +pub mod from_resolve; +pub mod make_endpoint; + +use self::buffer::Buffer; +use self::from_resolve::FromResolve; +use self::make_endpoint::MakeEndpoint; + +#[derive(Clone, Debug)] +pub struct Layer { + capacity: usize, + resolve: R, + _marker: std::marker::PhantomData, +} + +// === impl Layer === + +impl Layer { + pub fn new(capacity: usize, resolve: R) -> Self + where + R: Resolve + Clone, + R::Endpoint: fmt::Debug + Clone + PartialEq, + { + Self { + capacity, + resolve, + _marker: std::marker::PhantomData, + } + } +} + +impl tower::layer::Layer for Layer +where + T: fmt::Display, + R: Resolve + Send + Clone + 'static, + R::Error: Into, + R::Endpoint: fmt::Debug + Clone + PartialEq + Send, + R::Resolution: Send + 'static, + R::Future: Send + 'static, + M: tower::Service + Clone + Send + 'static, + M::Error: Into, + M::Response: Send + 'static, + M::Future: Send + 'static, +{ + type Service = Buffer, M>>; + + fn layer(&self, make_endpoint: M) -> Self::Service { + let make_discover = + MakeEndpoint::new(make_endpoint, FromResolve::new(self.resolve.clone())); + Buffer::new(self.capacity, make_discover) + } +} diff --git a/src/proxy/resolve.rs b/lib/linkerd2-proxy-discover/src/make_endpoint.rs similarity index 61% rename from src/proxy/resolve.rs rename to lib/linkerd2-proxy-discover/src/make_endpoint.rs index 15d47f7..8a0dd27 100644 --- a/src/proxy/resolve.rs +++ b/lib/linkerd2-proxy-discover/src/make_endpoint.rs @@ -1,45 +1,40 @@ -use crate::core::resolve::{Resolution, Resolve, Update}; -use crate::{svc, Error}; use futures::{stream::FuturesUnordered, try_ready, Async, Future, Poll, Stream}; use indexmap::IndexMap; -use std::{fmt, net::SocketAddr}; +use linkerd2_error::Error; +use std::hash::Hash; use tokio::sync::oneshot; -pub use tower_discover::Change; -use tracing::trace; +use tower::discover::{self, Change}; #[derive(Clone, Debug)] -pub struct Layer { - resolve: R, +pub struct MakeEndpoint { + make_discover: D, + make_endpoint: E, } -#[derive(Clone, Debug)] -pub struct MakeSvc { - resolve: R, - inner: M, +#[derive(Debug)] +pub struct DiscoverFuture { + future: F, + make_endpoint: Option, } /// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to /// build a service for each endpoint. -pub struct Discover> { - resolution: R, - make: M, - make_futures: MakeFutures, -} - -pub struct DiscoverFuture { - future: F, - make: M, +pub struct Discover> { + discover: D, + make_endpoint: E, + make_futures: MakeFutures, + pending_removals: Vec, } -struct MakeFutures { - futures: FuturesUnordered>, - cancelations: IndexMap>, +struct MakeFutures { + futures: FuturesUnordered>, + cancelations: IndexMap>, } -struct MakeFuture { +struct MakeFuture { + key: Option, inner: F, canceled: oneshot::Receiver<()>, - addr: SocketAddr, } enum MakeError { @@ -47,150 +42,157 @@ enum MakeError { Canceled, } -// === impl Layer === - -pub fn layer(resolve: R) -> Layer -where - R: Resolve + Clone, - R::Endpoint: fmt::Debug, -{ - Layer { resolve } -} - -impl svc::Layer for Layer -where - R: Clone, -{ - type Service = MakeSvc; - - fn layer(&self, inner: M) -> Self::Service { - MakeSvc { - resolve: self.resolve.clone(), - inner, +// === impl MakeEndpoint === + +impl MakeEndpoint { + pub fn new(make_endpoint: E, make_discover: D) -> Self + where + D: tower::Service, + InnerDiscover: discover::Discover, + InnerDiscover::Key: Clone, + InnerDiscover::Error: Into, + E: tower::Service + Clone, + E::Error: Into, + { + Self { + make_discover, + make_endpoint, } } } -// === impl MakeSvc === - -impl svc::Service for MakeSvc +impl tower::Service for MakeEndpoint where - R: Resolve, - R::Endpoint: fmt::Debug, - M: svc::Service + Clone, + D: tower::Service, + InnerDiscover: discover::Discover, + InnerDiscover::Key: Clone, + InnerDiscover::Error: Into, + E: tower::Service + Clone, + E::Error: Into, { - type Response = Discover; - type Error = ::Error; - type Future = DiscoverFuture>; + type Response = Discover; + type Error = D::Error; + type Future = DiscoverFuture; + #[inline] fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(().into()) // always ready to make a Discover + self.make_discover.poll_ready() } + #[inline] fn call(&mut self, target: T) -> Self::Future { - let future = self.resolve.resolve(&target); + let future = self.make_discover.call(target); DiscoverFuture { future, - make: Some(self.inner.clone()), + make_endpoint: Some(self.make_endpoint.clone()), } } } // === impl DiscoverFuture === -impl Future for DiscoverFuture> +impl Future for DiscoverFuture where - F: Future, - F::Item: Resolution, - M: svc::Service<::Endpoint>, + F: Future, + D: discover::Discover, + D::Key: Clone, + D::Error: Into, + E: tower::Service, + E::Error: Into, { - type Item = Discover; + type Item = Discover; type Error = F::Error; fn poll(&mut self) -> Poll { let resolution = try_ready!(self.future.poll()); - let make = self.make.take().expect("polled after ready"); - Ok(Async::Ready(Discover::new(resolution, make))) + let make_endpoint = self.make_endpoint.take().expect("polled after ready"); + Ok(Async::Ready(Discover::new(resolution, make_endpoint))) } } // === impl Discover === -impl Discover +impl Discover where - R: Resolution, - M: svc::Service, + D: discover::Discover, + D::Key: Clone, + D::Error: Into, + E: tower::Service, + E::Error: Into, { - fn new(resolution: R, make: M) -> Self { + pub fn new(discover: D, make_endpoint: E) -> Self { Self { - resolution, - make, + discover, + make_endpoint, make_futures: MakeFutures::new(), + pending_removals: Vec::new(), } } } -impl Discover +impl discover::Discover for Discover where - R: Resolution, - R::Endpoint: fmt::Debug, - R::Error: Into, - M: svc::Service, - M::Error: Into, + D: discover::Discover, + D::Key: Clone, + D::Error: Into, + E: tower::Service, + E::Error: Into, { - fn poll_resolution(&mut self) -> Poll, Error> { + type Key = D::Key; + type Service = E::Response; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + if let Async::Ready(key) = self.poll_removals()? { + return Ok(Async::Ready(Change::Remove(key))); + } + + if let Async::Ready(Some((key, svc))) = self.make_futures.poll().map_err(Into::into)? { + return Ok(Async::Ready(Change::Insert(key, svc))); + } + + Ok(Async::NotReady) + } +} + +impl Discover +where + D: discover::Discover, + D::Key: Clone, + D::Error: Into, + E: tower::Service, + E::Error: Into, +{ + fn poll_removals(&mut self) -> Poll { loop { + if let Some(key) = self.pending_removals.pop() { + self.make_futures.remove(&key); + return Ok(key.into()); + } + // Before polling the resolution, where we could potentially receive // an `Add`, poll_ready to ensure that `make` is ready to build new // services. Don't process any updates until we can do so. - try_ready!(self.make.poll_ready().map_err(Into::into)); + try_ready!(self.make_endpoint.poll_ready().map_err(Into::into)); - let update = try_ready!(self.resolution.poll().map_err(Into::into)); - trace!("watch: {:?}", update); - match update { - Update::Add(addr, target) => { + match try_ready!(self.discover.poll().map_err(Into::into)) { + Change::Insert(key, target) => { // Start building the service and continue. If a pending // service exists for this addr, it will be canceled. - let fut = self.make.call(target); - self.make_futures.push(addr, fut); + let fut = self.make_endpoint.call(target); + self.make_futures.push(key, fut); } - Update::Remove(addr) => { - self.make_futures.remove(&addr); - return Ok(Async::Ready(Change::Remove(addr))); + Change::Remove(key) => { + self.pending_removals.push(key); } } } } } -impl tower_discover::Discover for Discover -where - R: Resolution, - R::Endpoint: fmt::Debug, - R::Error: Into, - M: svc::Service, - M::Error: Into, -{ - type Key = SocketAddr; - type Service = M::Response; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - if let Async::Ready(change) = self.poll_resolution()? { - return Ok(Async::Ready(change)); - } - - if let Async::Ready(Some((addr, svc))) = self.make_futures.poll().map_err(Into::into)? { - return Ok(Async::Ready(Change::Insert(addr, svc))); - } - - Ok(Async::NotReady) - } -} - // === impl MakeFutures === -impl MakeFutures { +impl MakeFutures { fn new() -> Self { Self { futures: FuturesUnordered::new(), @@ -198,27 +200,27 @@ impl MakeFutures { } } - fn push(&mut self, addr: SocketAddr, inner: F) { + fn push(&mut self, key: K, inner: F) { let (cancel, canceled) = oneshot::channel(); - if let Some(prior) = self.cancelations.insert(addr, cancel) { + if let Some(prior) = self.cancelations.insert(key.clone(), cancel) { let _ = prior.send(()); } self.futures.push(MakeFuture { - addr, + key: Some(key), inner, canceled, }); } - fn remove(&mut self, addr: &SocketAddr) { - if let Some(cancel) = self.cancelations.remove(addr) { + fn remove(&mut self, key: &K) { + if let Some(cancel) = self.cancelations.remove(key) { let _ = cancel.send(()); } } } -impl Stream for MakeFutures { - type Item = (SocketAddr, F::Item); +impl Stream for MakeFutures { + type Item = (K, F::Item); type Error = F::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -226,10 +228,10 @@ impl Stream for MakeFutures { return match self.futures.poll() { Err(MakeError::Canceled) => continue, Err(MakeError::Inner(err)) => Err(err), - Ok(Async::Ready(Some((addr, svc)))) => { - let _rm = self.cancelations.remove(&addr); - debug_assert!(_rm.is_some(), "cancelation missing for {}", addr); - Ok(Async::Ready(Some((addr, svc)))) + Ok(Async::Ready(Some((key, svc)))) => { + let _rm = self.cancelations.remove(&key); + debug_assert!(_rm.is_some(), "cancelation missing"); + Ok(Async::Ready(Some((key, svc)))) } Ok(r) => Ok(r), }; @@ -239,17 +241,17 @@ impl Stream for MakeFutures { // === impl MakeFuture === -impl Future for MakeFuture { - type Item = (SocketAddr, F::Item); +impl Future for MakeFuture { + type Item = (K, F::Item); type Error = MakeError; fn poll(&mut self) -> Poll { if let Ok(Async::Ready(())) = self.canceled.poll() { - trace!("canceled making service for {:?}", self.addr); return Err(MakeError::Canceled); } let svc = try_ready!(self.inner.poll()); - Ok((self.addr, svc).into()) + let key = self.key.take().expect("polled after complete"); + Ok((key, svc).into()) } } @@ -265,28 +267,18 @@ impl From for MakeError { mod tests { use super::*; use futures::future; - use svc::Service; + use std::net::SocketAddr; use tokio::sync::mpsc; - use tower_discover::{Change, Discover as _Discover}; + use tower::discover::{self, Change, Discover as _}; + use tower::Service; use tower_util::service_fn; - struct Urx(mpsc::Receiver>); - impl Resolution for Urx { - type Endpoint = E; - type Error = mpsc::error::RecvError; - - fn poll(&mut self) -> Poll, Self::Error> { - let ep = try_ready!(self.0.poll()).expect("stream must not terminate"); - Ok(Async::Ready(ep)) - } - } - #[derive(Debug)] - struct Svc(Vec>); - impl Service<()> for Svc { - type Response = T; - type Error = oneshot::error::RecvError; - type Future = oneshot::Receiver; + struct Svc(Vec); + impl Service<()> for Svc { + type Response = F::Item; + type Error = F::Error; + type Future = F; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(().into()) @@ -297,22 +289,36 @@ mod tests { } } + struct Dx(mpsc::Receiver>); + + impl discover::Discover for Dx { + type Key = SocketAddr; + type Service = (); + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let change = try_ready!(self.0.poll()).expect("stream must not end"); + Ok(change.into()) + } + } + #[test] fn inserts_delivered_out_of_order() { with_task(move || { - let (mut reso_tx, resolution) = mpsc::channel(2); - let (make0_tx, make0_rx) = oneshot::channel::>(); - let (make1_tx, make1_rx) = oneshot::channel::>(); - let make = Svc(vec![make1_rx, make0_rx]); + let (mut reso_tx, reso_rx) = mpsc::channel(2); + let (make0_tx, make0_rx) = oneshot::channel::>>(); + let (make1_tx, make1_rx) = oneshot::channel::>>(); - let mut discover = Discover::new(Urx(resolution), make); + let mut discover = Discover::new(Dx(reso_rx), Svc(vec![make1_rx, make0_rx])); assert!( - discover.poll().expect("discover can't fail").is_not_ready(), + discover::Discover::poll(&mut discover) + .expect("discover can't fail") + .is_not_ready(), "ready without updates" ); let addr0 = SocketAddr::from(([127, 0, 0, 1], 80)); - reso_tx.try_send(Update::Add(addr0, ())).unwrap(); + reso_tx.try_send(Change::Insert(addr0, ())).ok().unwrap(); assert!( discover.poll().expect("discover can't fail").is_not_ready(), "ready without service being made" @@ -330,7 +336,8 @@ mod tests { let addr1 = SocketAddr::from(([127, 0, 0, 2], 80)); reso_tx - .try_send(Update::Add(addr1, ())) + .try_send(Change::Insert(addr1, ())) + .ok() .expect("update must be sent"); assert!( discover.poll().expect("discover can't fail").is_not_ready(), @@ -403,19 +410,18 @@ mod tests { #[test] fn overwriting_insert_cancels_original() { with_task(move || { - let (mut reso_tx, resolution) = mpsc::channel(2); - let (make0_tx, make0_rx) = oneshot::channel::>(); - let (make1_tx, make1_rx) = oneshot::channel::>(); - let make = Svc(vec![make1_rx, make0_rx]); + let (mut reso_tx, reso_rx) = mpsc::channel(2); + let (make0_tx, make0_rx) = oneshot::channel::>>(); + let (make1_tx, make1_rx) = oneshot::channel::>>(); - let mut discover = Discover::new(Urx(resolution), make); + let mut discover = Discover::new(Dx(reso_rx), Svc(vec![make1_rx, make0_rx])); assert!( discover.poll().expect("discover can't fail").is_not_ready(), "ready without updates" ); let addr = SocketAddr::from(([127, 0, 0, 1], 80)); - reso_tx.try_send(Update::Add(addr, ())).unwrap(); + reso_tx.try_send(Change::Insert(addr, ())).ok().unwrap(); assert!( discover.poll().expect("discover can't fail").is_not_ready(), "ready without service being made" @@ -432,7 +438,8 @@ mod tests { ); reso_tx - .try_send(Update::Add(addr, ())) + .try_send(Change::Insert(addr, ())) + .ok() .expect("update must be sent"); assert!( discover.poll().expect("discover can't fail").is_not_ready(), @@ -480,17 +487,19 @@ mod tests { #[test] fn cancelation_of_pending_service() { with_task(move || { - let (mut tx, resolution) = mpsc::channel(1); - let make = service_fn(|()| future::empty::, Error>()); + let (mut tx, reso_rx) = mpsc::channel(1); - let mut discover = Discover::new(Urx(resolution), make); + let mut discover = Discover::new( + Dx(reso_rx), + service_fn(|()| future::empty::, Error>()), + ); assert!( discover.poll().expect("discover can't fail").is_not_ready(), "ready without updates" ); let addr = SocketAddr::from(([127, 0, 0, 1], 80)); - tx.try_send(Update::Add(addr, ())).unwrap(); + tx.try_send(Change::Insert(addr, ())).ok().unwrap(); assert!( discover.poll().expect("discover can't fail").is_not_ready(), "ready without service being made" @@ -501,7 +510,7 @@ mod tests { "no pending cancelation" ); - tx.try_send(Update::Remove(addr)).unwrap(); + tx.try_send(Change::Remove(addr)).ok().unwrap(); match discover.poll().expect("discover can't fail") { Async::NotReady => panic!("remove not processed"), Async::Ready(Change::Insert(..)) => panic!("unexpected insert"), diff --git a/lib/linkerd2-proxy-resolve/Cargo.toml b/lib/linkerd2-proxy-resolve/Cargo.toml index 6eb28bd..097c25a 100644 --- a/lib/linkerd2-proxy-resolve/Cargo.toml +++ b/lib/linkerd2-proxy-resolve/Cargo.toml @@ -7,15 +7,10 @@ publish = false [dependencies] futures = "0.1" -linkerd2-addr = { path = "../linkerd2-addr" } -linkerd2-dns-name = { path = "../linkerd2-dns-name" } linkerd2-error = { path = "../linkerd2-error" } -linkerd2-identity = { path = "../linkerd2-identity" } -linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", rev = "ddbc3a4f7f8b0058801f896d27974d19ee98094c" } linkerd2-proxy-core = { path = "../linkerd2-proxy-core" } linkerd2-task = { path = "../linkerd2-task" } -prost = "0.5.0" -tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] } indexmap = "1.0" +tokio = "0.1" +tower = "0.1" tracing = "0.1" -tracing-futures = "0.0.1-alpha.1" diff --git a/lib/linkerd2-proxy-resolve/src/destination/client.rs b/lib/linkerd2-proxy-resolve/src/destination/client.rs deleted file mode 100644 index 42a5182..0000000 --- a/lib/linkerd2-proxy-resolve/src/destination/client.rs +++ /dev/null @@ -1,95 +0,0 @@ -use super::super::remote_stream::{self, Remote}; -use crate::addr::NameAddr; -use crate::api::destination::{client::Destination, GetDestination, Update}; -use futures::{Async, Poll, Stream}; -use std::sync::Arc; -use tower_grpc::{self as grpc, generic::client::GrpcService, BoxBody}; -use tracing::trace; - -/// A client for making service discovery requests to the destination service. -#[derive(Clone)] -pub struct Client { - client: T, - context_token: Arc, -} - -/// A destination service query for a particular name. -/// -/// A `Query` manages the underlying gRPC request and can reconnect itself as necessary. -pub struct Query -where - T: GrpcService, -{ - auth: NameAddr, - client: Client, - query: remote_stream::Remote, -} - -// ===== impl Client ===== - -impl Client -where - T: GrpcService, -{ - fn query(&mut self, dst: &NameAddr, kind: &str) -> Remote { - trace!("{}ing destination service query for {}", kind, dst); - if let Ok(Async::Ready(())) = self.client.poll_ready() { - let req = GetDestination { - scheme: "k8s".into(), - path: format!("{}", dst), - context_token: self.context_token.as_ref().clone(), - }; - let mut svc = Destination::new(self.client.as_service()); - let response = svc.get(grpc::Request::new(req)); - let rx = remote_stream::Receiver::new(response); - Remote::ConnectedOrConnecting { rx } - } else { - trace!("destination client not yet ready"); - Remote::NeedsReconnect - } - } - - /// Returns a destination service query for the given `dst`. - pub fn connect(mut self, dst: &NameAddr) -> Query { - let query = self.query(dst, "connect"); - Query { - auth: dst.clone(), - client: self, - query, - } - } - - pub fn new(client: T, proxy_id: String) -> Self { - Self { - client, - context_token: Arc::new(proxy_id), - } - } -} - -impl Query -where - T: GrpcService, -{ - pub fn authority(&self) -> &NameAddr { - &self.auth - } - - /// Indicates that this query should be reconnected. - pub fn reconnect(&mut self) { - self.query = Remote::NeedsReconnect; - } - - /// Polls the destination service query for updates, reconnecting if necessary. - pub fn poll(&mut self) -> Poll, grpc::Status> { - loop { - self.query = match self.query { - Remote::ConnectedOrConnecting { ref mut rx } => return rx.poll(), - Remote::NeedsReconnect => match self.client.query(&self.auth, "reconnect") { - Remote::NeedsReconnect => return Ok(Async::NotReady), - query => query, - }, - } - } - } -} diff --git a/lib/linkerd2-proxy-resolve/src/destination/mod.rs b/lib/linkerd2-proxy-resolve/src/destination/mod.rs deleted file mode 100644 index 9ed7d7e..0000000 --- a/lib/linkerd2-proxy-resolve/src/destination/mod.rs +++ /dev/null @@ -1,163 +0,0 @@ -//! A client for the controller's Destination service. -//! -//! This client is split into two primary components: A `Resolver`, that routers use to -//! initiate service discovery for a given name, and a `background::Process` that -//! satisfies these resolution requests. These components are separated by a channel so -//! that the thread responsible for proxying data need not also do this administrative -//! work of communicating with the control plane. -//! -//! The number of active resolutions is not currently bounded by this module. Instead, we -//! trust that callers of `Resolver` enforce such a constraint (for example, via -//! `linkerd2_proxy_router`'s LRU cache). Additionally, users of this module must ensure -//! they consume resolutions as they are sent so that the response channels don't grow -//! without bounds. -//! -//! Furthermore, there are not currently any bounds on the number of endpoints that may be -//! returned for a single resolution. It is expected that the Destination service enforce -//! some reasonable upper bounds. -//! -//! ## TODO -//! -//! - Given that the underlying gRPC client has some max number of concurrent streams, we -//! actually do have an upper bound on concurrent resolutions. This needs to be made -//! more explicit. -//! - We need some means to limit the number of endpoints that can be returned for a -//! single resolution so that `control::Cache` is not effectively unbounded. - -use crate::addr::NameAddr; -use crate::core::resolve::Resolve; -use crate::dns; -use crate::identity; -use indexmap::IndexMap; -use std::sync::Arc; -use tower_grpc::{generic::client::GrpcService, Body, BoxBody}; -use tracing::trace; - -mod client; -mod resolution; - -use self::client::Client; -pub use self::resolution::{Resolution, ResolveFuture, Unresolvable}; - -/// A handle to request resolutions from the destination service. -#[derive(Clone)] -pub struct Resolver { - client: Client, - suffixes: Arc>, -} - -/// Metadata describing an endpoint. -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct Metadata { - /// An endpoint's relative weight. - /// - /// A weight of 0 means that the endpoint should never be preferred over a - /// non 0-weighted endpoint. - /// - /// The default weight, corresponding to 1.0, is 10,000. This enables us to - /// specify weights as small as 0.0001 and as large as 400,000+. - /// - /// A float is not used so that this type can implement `Eq`. - weight: u32, - - /// Arbitrary endpoint labels. Primarily used for telemetry. - labels: IndexMap, - - /// A hint from the controller about what protocol (HTTP1, HTTP2, etc) the - /// destination understands. - protocol_hint: ProtocolHint, - - /// How to verify TLS for the endpoint. - identity: Option, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum ProtocolHint { - /// We don't what the destination understands, so forward messages in the - /// protocol we received them in. - Unknown, - /// The destination can receive HTTP2 messages. - Http2, -} - -// ==== impl Resolver ===== - -impl Resolver -where - T: GrpcService + Clone + Send + 'static, - T::ResponseBody: Send, - ::Data: Send, - T::Future: Send, -{ - /// Returns a `Resolver` for requesting destination resolutions. - pub fn new(client: T, suffixes: Vec, proxy_id: String) -> Self { - Self { - suffixes: Arc::new(suffixes), - client: Client::new(client, proxy_id), - } - } -} - -impl Resolve for Resolver -where - T: GrpcService + Clone + Send + 'static, - T::ResponseBody: Send, - ::Data: Send, - T::Future: Send, -{ - type Endpoint = Metadata; - type Resolution = Resolution; - type Future = ResolveFuture; - - /// Start watching for address changes for a certain authority. - fn resolve(&self, authority: &NameAddr) -> Self::Future { - trace!("resolve; authority={:?}", authority); - - if self.suffixes.iter().any(|s| s.contains(authority.name())) { - return ResolveFuture::new(authority, self.client.clone()); - } else { - trace!("-> authority {} not in search suffixes", authority); - } - ResolveFuture::unresolvable() - } -} - -// ===== impl Metadata ===== - -impl Metadata { - pub fn empty() -> Self { - Self { - labels: IndexMap::default(), - protocol_hint: ProtocolHint::Unknown, - identity: None, - weight: 10_000, - } - } - - pub fn new( - labels: IndexMap, - protocol_hint: ProtocolHint, - identity: Option, - weight: u32, - ) -> Self { - Self { - labels, - protocol_hint, - identity, - weight, - } - } - - /// Returns the endpoint's labels from the destination service, if it has them. - pub fn labels(&self) -> &IndexMap { - &self.labels - } - - pub fn protocol_hint(&self) -> ProtocolHint { - self.protocol_hint - } - - pub fn identity(&self) -> Option<&identity::Name> { - self.identity.as_ref() - } -} diff --git a/lib/linkerd2-proxy-resolve/src/destination/resolution.rs b/lib/linkerd2-proxy-resolve/src/destination/resolution.rs deleted file mode 100644 index ecf8742..0000000 --- a/lib/linkerd2-proxy-resolve/src/destination/resolution.rs +++ /dev/null @@ -1,443 +0,0 @@ -use super::client; -use crate::addr::NameAddr; -use crate::api::{ - destination::{ - protocol_hint::Protocol, update::Update as PbUpdate2, TlsIdentity, Update as PbUpdate, - WeightedAddr, - }, - net::TcpAddress, -}; -use crate::core::resolve::{self, Update}; -use crate::destination::{Metadata, ProtocolHint}; -use crate::{identity, task, Never}; -use futures::{ - future::Future, - sync::{mpsc, oneshot}, - Async, Poll, Stream, -}; -use indexmap::{IndexMap, IndexSet}; -use std::{collections::HashMap, error::Error, fmt, net::SocketAddr}; -use tower_grpc::{self as grpc, generic::client::GrpcService, Body, BoxBody}; -use tracing::{debug, info_span, trace, warn}; -use tracing_futures::Instrument; - -/// A resolution for a single authority. -pub struct Resolution { - rx: mpsc::UnboundedReceiver>, - _hangup: oneshot::Sender, -} - -pub struct ResolveFuture -where - T: GrpcService, -{ - query: Option>, -} - -/// An error indicating that the Destination service cannot resolve the -/// requested name. -#[derive(Debug)] -pub struct Unresolvable { - _p: (), -} - -/// Drives the query associated with a `Resolution`. -/// -/// Each destination service query is driven by its own background `Daemon`, -/// rather than in `Resolution::poll`, so that changes in the discovered -/// endpoints are handled as they are received, rather than only when polling -/// the resolution. -struct Daemon -where - T: GrpcService, -{ - query: client::Query, - updater: Updater, -} - -/// Updates the `Resolution` when the set of discovered endpoints changes. -/// -/// This is more than just the send end of the channel, as it also tracks the -/// state necessary to reset stale endpoints when reconnecting. -struct Updater { - tx: mpsc::UnboundedSender>, - /// This receiver is used to detect if the resolution has been dropped. - hangup: oneshot::Receiver, - /// All the endpoint addresses seen since the last reset. - seen: IndexSet, - /// Set to true on reconnects to indicate that previously seen addresses - /// should be reset when the query reconnects. - reset: bool, -} - -#[derive(Clone, Debug)] -struct LogCtx(NameAddr); - -struct DisplayUpdate<'a>(&'a Update); - -impl resolve::Resolution for Resolution { - type Endpoint = Metadata; - type Error = Never; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.rx.poll() { - Ok(Async::Ready(Some(up))) => Ok(Async::Ready(up)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) | Ok(Async::Ready(None)) => { - trace!("resolution daemon has terminated"); - Ok(Async::NotReady) - } - } - } -} - -impl Resolution { - fn new() -> (Self, Updater) { - let (tx, rx) = mpsc::unbounded(); - - // This oneshot allows the daemon to be notified when the Self::Stream - // is dropped. - let (hangup_tx, hangup_rx) = oneshot::channel(); - let resolution = Self { - rx, - _hangup: hangup_tx, - }; - (resolution, Updater::new(tx, hangup_rx)) - } -} - -// ===== impl ResolveFuture ===== - -impl ResolveFuture -where - T: GrpcService + Send, -{ - pub(super) fn new(authority: &NameAddr, client: client::Client) -> Self { - Self { - query: Some(client.connect(&authority)), - } - } - - pub(super) fn unresolvable() -> Self { - Self { query: None } - } -} - -impl Future for ResolveFuture -where - T: GrpcService + Send + 'static, - T::ResponseBody: Send, - ::Data: Send, - T::Future: Send, -{ - type Item = Resolution; - type Error = Unresolvable; - - fn poll(&mut self) -> Poll { - loop { - let update = match self.query { - Some(ref mut query) => match query.poll() { - Ok(Async::Ready(Some(up))) => up, - Ok(Async::Ready(None)) => { - warn!("Destination.Get stream ended immediately, must reconnect"); - query.reconnect(); - continue; - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(ref status) if status.code() == grpc::Code::InvalidArgument => { - trace!("{} is unresolvable", query.authority()); - return Err(Unresolvable { _p: () }); - } - Err(err) => { - warn!("Destination.Get stream error {}, must reconnect", err); - query.reconnect(); - continue; - } - }, - None => { - trace!("name is unresolvable"); - return Err(Unresolvable { _p: () }); - } - }; - - let (res, mut updater) = Resolution::new(); - updater - .update(update) - .expect("resolution should not have been dropped"); - - let query = self.query.take().expect("invalid state"); - - let authority = query.authority().clone(); - let fut = Daemon { updater, query }.instrument(info_span!("resolve", %authority)); - task::spawn(fut); - - return Ok(Async::Ready(res)); - } - } -} - -// ===== impl Daemon ===== - -impl Future for Daemon -where - T: GrpcService, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - match self.updater.hangup.poll() { - Ok(Async::Ready(never)) => match never {}, // unreachable! - Ok(Async::NotReady) => {} - Err(_) => { - // Hangup tx has been dropped. - debug!("resolution cancelled"); - return Ok(Async::Ready(())); - } - }; - - loop { - match self.query.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(Some(update))) => { - if let Err(_) = self.updater.update(update) { - trace!("resolution dropped, daemon terminating..."); - return Ok(Async::Ready(())); - } - } - Ok(Async::Ready(None)) => { - trace!("Destination.Get stream ended, must reconnect"); - self.updater.should_reset(); - self.query.reconnect(); - } - Err(err) => { - warn!("Destination.Get stream error: {}", err); - self.updater.should_reset(); - self.query.reconnect(); - } - } - } - } -} - -// ===== impl Updater ===== - -impl Updater { - fn new(tx: mpsc::UnboundedSender>, hangup: oneshot::Receiver) -> Self { - Self { - tx, - hangup, - seen: IndexSet::new(), - reset: false, - } - } - - fn update(&mut self, update: PbUpdate) -> Result<(), ()> { - match update.update { - Some(PbUpdate2::Add(a_set)) => { - let set_labels = a_set.metric_labels; - let addrs = a_set - .addrs - .into_iter() - .filter_map(|pb| pb_to_addr_meta(pb, &set_labels)); - self.add(addrs)?; - } - Some(PbUpdate2::Remove(r_set)) => { - let addrs = r_set.addrs.into_iter().filter_map(pb_to_sock_addr); - self.remove(addrs)?; - } - Some(PbUpdate2::NoEndpoints(_)) => { - trace!("has no endpoints"); - self.remove_all("no endpoints")?; - } - None => (), - } - Ok(()) - } - - fn send(&mut self, update: Update) -> Result<(), ()> { - trace!("{}", DisplayUpdate(&update)); - self.tx.unbounded_send(update).map_err(|_| ()) - } - - /// Indicates that the resolution should reset any previously discovered - /// endpoints on the next update received after a reconnect. - fn should_reset(&mut self) { - self.reset = true; - } - - /// Removes any previously discovered endpoints if they are stale. - /// Otherwise, does nothing. - /// - /// This is called when processing a new update. - fn reset_if_needed(&mut self) -> Result<(), ()> { - if self.reset { - trace!("query reconnected; removing stale endpoints"); - self.remove_all("stale")?; - self.reset = false; - } - Ok(()) - } - - fn add(&mut self, addrs: impl Iterator) -> Result<(), ()> { - self.reset_if_needed()?; - for (addr, meta) in addrs { - self.seen.insert(addr); - self.send(Update::Add(addr, meta))?; - } - Ok(()) - } - - fn remove(&mut self, addrs: impl Iterator) -> Result<(), ()> { - self.reset_if_needed()?; - for addr in addrs { - self.seen.remove(&addr); - self.send(Update::Remove(addr))?; - } - Ok(()) - } - - fn remove_all(&mut self, reason: &'static str) -> Result<(), ()> { - for addr in self.seen.drain(..) { - trace!("remove {} ({})", addr, reason); - self.tx - .unbounded_send(Update::Remove(addr)) - .map_err(|_| ())?; - } - Ok(()) - } -} - -impl<'a> fmt::Display for DisplayUpdate<'a> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0 { - Update::Remove(ref addr) => write!(f, "remove {}", addr), - Update::Add(ref addr, ..) => write!(f, "add {}", addr), - } - } -} - -impl fmt::Display for LogCtx { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "resolver addr={}", self.0) - } -} - -// === impl Unresolvable === - -impl Unresolvable { - pub fn new() -> Self { - Self { _p: () } - } -} - -impl fmt::Display for Unresolvable { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "this name cannot be resolved by the destination service".fmt(f) - } -} - -impl Error for Unresolvable {} - -/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. -fn pb_to_addr_meta( - pb: WeightedAddr, - set_labels: &HashMap, -) -> Option<(SocketAddr, Metadata)> { - let addr = pb.addr.and_then(pb_to_sock_addr)?; - - let meta = { - let mut t = set_labels - .iter() - .chain(pb.metric_labels.iter()) - .collect::>(); - t.sort_by(|(k0, _), (k1, _)| k0.cmp(k1)); - - let mut m = IndexMap::with_capacity(t.len()); - for (k, v) in t.into_iter() { - m.insert(k.clone(), v.clone()); - } - - m - }; - - let mut proto_hint = ProtocolHint::Unknown; - if let Some(hint) = pb.protocol_hint { - if let Some(proto) = hint.protocol { - match proto { - Protocol::H2(..) => { - proto_hint = ProtocolHint::Http2; - } - } - } - } - - let tls_id = pb.tls_identity.and_then(pb_to_id); - let meta = Metadata::new(meta, proto_hint, tls_id, pb.weight); - Some((addr, meta)) -} - -fn pb_to_id(pb: TlsIdentity) -> Option { - use crate::api::destination::tls_identity::Strategy; - - let Strategy::DnsLikeIdentity(i) = pb.strategy?; - match identity::Name::from_hostname(i.name.as_bytes()) { - Ok(i) => Some(i), - Err(_) => { - warn!("Ignoring invalid identity: {}", i.name); - None - } - } -} - -fn pb_to_sock_addr(pb: TcpAddress) -> Option { - use crate::api::net::ip_address::Ip; - use std::net::{Ipv4Addr, Ipv6Addr}; - /* - current structure is: - TcpAddress { - ip: Option, - }>, - port: u32, - } - */ - match pb.ip { - Some(ip) => match ip.ip { - Some(Ip::Ipv4(octets)) => { - let ipv4 = Ipv4Addr::from(octets); - Some(SocketAddr::from((ipv4, pb.port as u16))) - } - Some(Ip::Ipv6(v6)) => { - let octets = [ - (v6.first >> 56) as u8, - (v6.first >> 48) as u8, - (v6.first >> 40) as u8, - (v6.first >> 32) as u8, - (v6.first >> 24) as u8, - (v6.first >> 16) as u8, - (v6.first >> 8) as u8, - v6.first as u8, - (v6.last >> 56) as u8, - (v6.last >> 48) as u8, - (v6.last >> 40) as u8, - (v6.last >> 32) as u8, - (v6.last >> 24) as u8, - (v6.last >> 16) as u8, - (v6.last >> 8) as u8, - v6.last as u8, - ]; - let ipv6 = Ipv6Addr::from(octets); - Some(SocketAddr::from((ipv6, pb.port as u16))) - } - None => None, - }, - None => None, - } -} diff --git a/lib/linkerd2-proxy-resolve/src/lib.rs b/lib/linkerd2-proxy-resolve/src/lib.rs index ee5d0d5..af8e9ad 100644 --- a/lib/linkerd2-proxy-resolve/src/lib.rs +++ b/lib/linkerd2-proxy-resolve/src/lib.rs @@ -1,12 +1,4 @@ -use linkerd2_addr as addr; -use linkerd2_dns_name as dns; -use linkerd2_error::Never; -use linkerd2_identity as identity; -use linkerd2_proxy_api as api; -use linkerd2_proxy_core as core; -use linkerd2_task as task; +#![deny(warnings, rust_2018_idioms)] -mod destination; -mod remote_stream; - -pub use self::destination::{Metadata, ProtocolHint, Resolver, Unresolvable}; +pub mod map_endpoint; +pub mod recover; diff --git a/lib/linkerd2-proxy-resolve/src/map_endpoint.rs b/lib/linkerd2-proxy-resolve/src/map_endpoint.rs new file mode 100644 index 0000000..c0b6cde --- /dev/null +++ b/lib/linkerd2-proxy-resolve/src/map_endpoint.rs @@ -0,0 +1,136 @@ +//! A middleware that wraps `Resolutions`, modifying their endpoint type. + +use futures::{try_ready, Async, Future, Poll}; +use linkerd2_proxy_core::resolve; +use std::net::SocketAddr; + +pub trait MapEndpoint { + type Out; + fn map_endpoint(&self, target: &Target, addr: SocketAddr, in_ep: In) -> Self::Out; +} + +#[derive(Clone, Debug)] +pub struct Resolve { + resolve: R, + map: M, +} + +#[derive(Debug)] +pub struct ResolveFuture { + future: F, + target: Option, + map: Option, +} + +#[derive(Clone, Debug)] +pub struct Resolution { + resolution: R, + target: T, + map: M, +} + +// === impl Resolve === + +impl Resolve { + pub fn new(map: M, resolve: R) -> Self + where + Self: resolve::Resolve, + { + Self { resolve, map } + } +} + +impl tower::Service for Resolve +where + T: Clone, + R: resolve::Resolve, + M: MapEndpoint + Clone, +{ + type Response = Resolution; + type Error = R::Error; + type Future = ResolveFuture; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.resolve.poll_ready() + } + + #[inline] + fn call(&mut self, target: T) -> Self::Future { + let future = self.resolve.resolve(target.clone()); + Self::Future { + future, + target: Some(target), + map: Some(self.map.clone()), + } + } +} + +// === impl ResolveFuture === + +impl Future for ResolveFuture +where + F: Future, + F::Item: resolve::Resolution, + M: MapEndpoint::Endpoint>, +{ + type Item = Resolution; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let resolution = try_ready!(self.future.poll()); + let target = self.target.take().expect("polled after ready"); + let map = self.map.take().expect("polled after ready"); + Ok(Async::Ready(Resolution { + resolution, + target, + map, + })) + } +} + +// === impl Resolution === + +impl resolve::Resolution for Resolution +where + R: resolve::Resolution, + M: MapEndpoint, +{ + type Endpoint = M::Out; + type Error = R::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let update = match try_ready!(self.resolution.poll()) { + resolve::Update::Add(eps) => resolve::Update::Add( + eps.into_iter() + .map(|(a, ep)| { + let ep = self.map.map_endpoint(&self.target, a, ep); + (a, ep) + }) + .collect(), + ), + resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs), + resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, + resolve::Update::Empty => resolve::Update::Empty, + }; + Ok(update.into()) + } +} + +// === impl MapEndpoint === + +impl MapEndpoint for () { + type Out = N; + + fn map_endpoint(&self, _: &T, _: SocketAddr, ep: N) -> Self::Out { + ep + } +} + +impl Out> MapEndpoint for F { + type Out = Out; + + fn map_endpoint(&self, target: &T, addr: SocketAddr, ep: In) -> Self::Out { + (self)(target, addr, ep) + } +} diff --git a/lib/linkerd2-proxy-resolve/src/recover.rs b/lib/linkerd2-proxy-resolve/src/recover.rs new file mode 100644 index 0000000..25aabc6 --- /dev/null +++ b/lib/linkerd2-proxy-resolve/src/recover.rs @@ -0,0 +1,489 @@ +//! A middleware that recovers a resolution after some failures. + +use futures::{try_ready, Async, Future, Poll, Stream}; +use indexmap::IndexMap; +use linkerd2_error::{Error, Recover}; +use linkerd2_proxy_core::resolve::{self, Resolution as _, Update}; +use std::net::SocketAddr; + +#[derive(Clone, Debug)] +pub struct Resolve { + resolve: R, + recover: E, +} + +pub struct ResolveFuture> { + inner: Option>, +} + +pub struct Resolution> { + inner: Inner, + cache: IndexMap, + reconcile: Option>, +} + +struct Inner> { + target: T, + resolve: R, + recover: E, + state: State, +} + +// #[derive(Debug)] +// struct Cache { +// active: IndexMap, +// } + +enum State { + Disconnected { + backoff: Option, + }, + Connecting { + future: F, + backoff: Option, + }, + // XXX This state shouldn't be necessary, but we need it to pass tests(!) + // that don't properly mimic the go server's behavior. See + // linkerd/linkerd2#3362. + Pending { + resolution: Option, + backoff: Option, + }, + Connected { + resolution: R, + initial: Option>, + }, + Recover { + error: Option, + backoff: Option, + }, + Backoff(Option), +} + +// === impl Resolve === + +impl Resolve { + pub fn new(recover: E, resolve: R) -> Self + where + Self: resolve::Resolve, + { + Self { resolve, recover } + } +} + +impl tower::Service for Resolve +where + T: Clone, + R: resolve::Resolve + Clone, + R::Endpoint: Clone + PartialEq, + E: Recover + Clone, +{ + type Response = Resolution; + type Error = Error; + type Future = ResolveFuture; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.resolve.poll_ready().map_err(Into::into) + } + + #[inline] + fn call(&mut self, target: T) -> Self::Future { + let future = self.resolve.resolve(target.clone()); + + Self::Future { + inner: Some(Inner { + state: State::Connecting { + future, + backoff: None, + }, + target: target.clone(), + recover: self.recover.clone(), + resolve: self.resolve.clone(), + }), + } + } +} + +// === impl ResolveFuture === + +impl Future for ResolveFuture +where + T: Clone, + R: resolve::Resolve, + R::Endpoint: Clone + PartialEq, + E: Recover, +{ + type Item = Resolution; + type Error = Error; + + fn poll(&mut self) -> Poll { + // Wait until the resolution is connected. + try_ready!(self + .inner + .as_mut() + .expect("polled after complete") + .poll_connected()); + + Ok(Async::Ready(Resolution { + inner: self.inner.take().expect("polled after complete"), + cache: IndexMap::default(), + //cache: Cache::default(), + reconcile: None, + })) + } +} + +// === impl Resolution === + +impl resolve::Resolution for Resolution +where + T: Clone, + R: resolve::Resolve, + R::Endpoint: Clone + PartialEq, + E: Recover, +{ + type Endpoint = R::Endpoint; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + // If a reconciliation update is buffered (i.e. after + // reconcile_after_reconnect), process it immediately. + if let Some(update) = self.reconcile.take() { + self.update_active(&update); + return Ok(update.into()); + } + + if let State::Connected { + ref mut resolution, + ref mut initial, + } = self.inner.state + { + // XXX Due to linkerd/linkerd2#3362, errors can't be discovered + // eagerly, so we must potentially read the first update to be + // sure it didn't fail. If that's the case, then reconcile the + // cache against the initial update. + if let Some(initial) = initial.take() { + // The initial state afer a reconnect may be identitical to + // the prior state, and so there may be no updates to + // advertise. + if let Some((update, reconcile)) = reconcile_after_connect(&self.cache, initial) + { + self.reconcile = reconcile; + self.update_active(&update); + return Ok(update.into()); + } + } + + // Process the resolution stream, updating the cache. + // + // Attempt recovery/backoff if the resolution fails. + match resolve::Resolution::poll(resolution) { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(update)) => { + self.update_active(&update); + return Ok(update.into()); + } + Err(e) => { + self.inner.state = State::Recover { + error: Some(e.into()), + backoff: None, + }; + } + } + } + + try_ready!(self.inner.poll_connected()); + } + } +} + +impl Resolution +where + T: Clone, + R: resolve::Resolve, + R::Endpoint: Clone + PartialEq, + E: Recover, +{ + fn update_active(&mut self, update: &Update) { + match update { + Update::Add(ref endpoints) => { + self.cache.extend(endpoints.clone()); + } + Update::Remove(ref addrs) => { + for addr in addrs.iter() { + self.cache.remove(addr); + } + } + Update::DoesNotExist | Update::Empty => { + self.cache.drain(..); + } + } + } +} + +// === impl Inner === + +impl Inner +where + T: Clone, + R: resolve::Resolve, + R::Endpoint: Clone + PartialEq, + E: Recover, +{ + /// Drives the state forward until its connected. + fn poll_connected(&mut self) -> Poll<(), Error> { + loop { + self.state = match self.state { + // When disconnected, start connecting. + // + // If we're recovering from a previous failure, we retain the + // backoff in case this connection attempt fails. + State::Disconnected { ref mut backoff } => { + tracing::trace!("connecting"); + let future = self.resolve.resolve(self.target.clone()); + State::Connecting { + future, + backoff: backoff.take(), + } + } + + State::Connecting { + ref mut future, + ref mut backoff, + } => match future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(resolution)) => { + tracing::trace!("pending"); + State::Pending { + resolution: Some(resolution), + backoff: backoff.take(), + } + } + Err(e) => State::Recover { + error: Some(e.into()), + backoff: backoff.take(), + }, + }, + + // We've already connected, but haven't yet received an update + // (or an error). This state shouldn't exist. See + // linkerd/linkerd2#3362. + State::Pending { + ref mut resolution, + ref mut backoff, + } => match resolution.as_mut().unwrap().poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => State::Recover { + error: Some(e.into()), + backoff: backoff.take(), + }, + Ok(Async::Ready(initial)) => { + tracing::trace!("connected"); + State::Connected { + resolution: resolution.take().unwrap(), + initial: Some(initial), + } + } + }, + + State::Connected { .. } => return Ok(Async::Ready(())), + + // If any stage failed, try to recover. If the error is + // recoverable, start (or continue) backing off... + State::Recover { + ref mut error, + ref mut backoff, + } => { + let err = error.take().expect("illegal state"); + tracing::debug!(message = %err); + let new_backoff = self.recover.recover(err)?; + State::Backoff(backoff.take().or(Some(new_backoff))) + } + + State::Backoff(ref mut backoff) => { + // If the backoff fails, it's not recoverable. + match backoff + .as_mut() + .expect("illegal state") + .poll() + .map_err(Into::into)? + { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(unit) => { + tracing::trace!("disconnected"); + let backoff = if unit.is_some() { backoff.take() } else { None }; + State::Disconnected { backoff } + } + } + } + }; + } + } +} + +/// Computes the updates needed after a connection is (re-)established. +// Raw fn for easier testing. +fn reconcile_after_connect( + cache: &IndexMap, + initial: Update, +) -> Option<(Update, Option>)> { + match initial { + // When the first update after a disconnect is an Add, it should + // contain the new state of the replica set. + Update::Add(endpoints) => { + let mut new_eps = endpoints.into_iter().collect::>(); + let mut rm_addrs = Vec::with_capacity(cache.len()); + for (addr, endpoint) in cache.iter() { + match new_eps.get(addr) { + // If the endpoint is in the active set and not in + // the new set, it needs to be removed. + None => { + rm_addrs.push(*addr); + } + // If the endpoint is already in the active set, + // remove it from the new set (to avoid rebuilding + // services unnecessarily). + Some(ep) => { + // The endpoints must be identitical, though. + if *ep == *endpoint { + new_eps.remove(addr); + } + } + } + } + let add = if new_eps.is_empty() { + None + } else { + Some(Update::Add(new_eps.into_iter().collect())) + }; + let rm = if rm_addrs.is_empty() { + None + } else { + Some(Update::Remove(rm_addrs)) + }; + // Advertise adds before removes so that we don't unnecessarily + // empty out a consumer. + match add { + Some(add) => Some((add, rm)), + None => rm.map(|rm| (rm, None)), + } + } + // It would be exceptionally odd to get a remove, specifically, + // immediately after a reconnect, but it seems appropriate to + // handle it as Empty. + Update::Remove(..) | Update::Empty => Some((Update::Empty, None)), + Update::DoesNotExist => Some((Update::DoesNotExist, None)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + pub fn addr0() -> SocketAddr { + ([198, 51, 100, 1], 8080).into() + } + + pub fn addr1() -> SocketAddr { + ([198, 51, 100, 2], 8080).into() + } + + #[test] + fn reconcile_after_initial_connect() { + let cache = IndexMap::default(); + let add = Update::Add(vec![(addr0(), 0), (addr1(), 0)]); + assert_eq!( + reconcile_after_connect(&cache, add.clone()), + Some((add, None)), + "Adds should be passed through initially" + ); + assert_eq!( + reconcile_after_connect(&cache, Update::Remove(vec![addr0(), addr1()])), + Some((Update::Empty, None)), + "Removes should be treated as empty" + ); + assert_eq!( + reconcile_after_connect(&cache, Update::Empty), + Some((Update::Empty, None)), + "Empties should be passed through" + ); + assert_eq!( + reconcile_after_connect(&cache, Update::DoesNotExist), + Some((Update::DoesNotExist, None)), + "DNEs should be passed through" + ); + } + + #[test] + fn reconcile_after_reconnect_dedupes() { + let mut cache = IndexMap::new(); + cache.insert(addr0(), 0); + + assert_eq!( + reconcile_after_connect(&cache, Update::Add(vec![(addr0(), 0), (addr1(), 0)])), + Some((Update::Add(vec![(addr1(), 0)]), None)), + ); + } + + #[test] + fn reconcile_after_reconnect_updates() { + let mut cache = IndexMap::new(); + cache.insert(addr0(), 0); + + assert_eq!( + reconcile_after_connect(&cache, Update::Add(vec![(addr0(), 1), (addr1(), 0)])), + Some((Update::Add(vec![(addr0(), 1), (addr1(), 0)]), None)), + ); + } + + #[test] + fn reconcile_after_reconnect_removes() { + let mut cache = IndexMap::new(); + cache.insert(addr0(), 0); + cache.insert(addr1(), 0); + + assert_eq!( + reconcile_after_connect(&cache, Update::Add(vec![(addr0(), 0)])), + Some((Update::Remove(vec![addr1()]), None)) + ); + } + + #[test] + fn reconcile_after_reconnect_adds_and_removes() { + let mut cache = IndexMap::new(); + cache.insert(addr0(), 0); + cache.insert(addr1(), 0); + + assert_eq!( + reconcile_after_connect(&cache, Update::Add(vec![(addr0(), 1)])), + Some(( + Update::Add(vec![(addr0(), 1)]), + Some(Update::Remove(vec![addr1()])) + )) + ); + } + + #[test] + fn reconcile_after_reconnect_passthru() { + let mut cache = IndexMap::default(); + cache.insert(addr0(), 0); + + assert_eq!( + reconcile_after_connect(&cache, Update::Remove(vec![addr1()])), + Some((Update::Empty, None)), + "Removes should be treated as empty" + ); + assert_eq!( + reconcile_after_connect(&cache, Update::Empty), + Some((Update::Empty, None)), + "Empties should be passed through" + ); + assert_eq!( + reconcile_after_connect(&cache, Update::DoesNotExist), + Some((Update::DoesNotExist, None)), + "DNEs should be passed through" + ); + } +} diff --git a/lib/linkerd2-proxy-resolve/src/remote_stream.rs b/lib/linkerd2-proxy-resolve/src/remote_stream.rs deleted file mode 100644 index 01f2064..0000000 --- a/lib/linkerd2-proxy-resolve/src/remote_stream.rs +++ /dev/null @@ -1,67 +0,0 @@ -use futures::{try_ready, Future, Poll, Stream}; -use prost::Message; -use tower_grpc::{ - self as grpc, client::server_streaming::ResponseFuture, generic::client::GrpcService, BoxBody, - Streaming, -}; - -/// Tracks the state of a gRPC response stream from a remote. -/// -/// A remote may hold a `Receiver` that can be used to read `M`-typed messages from the -/// remote stream. -pub enum Remote -where - S: GrpcService, -{ - NeedsReconnect, - ConnectedOrConnecting { rx: Receiver }, -} - -/// Receives streaming RPCs updates. -/// -/// Streaming gRPC endpoints return a `ResponseFuture` whose item is a `Response`. -/// A `Receiver` holds the state of that RPC call, exposing a `Stream` that drives both -/// the gRPC response and its streaming body. -pub struct Receiver -where - S: GrpcService, -{ - rx: Rx, -} - -enum Rx -where - S: GrpcService, -{ - Waiting(ResponseFuture), - Streaming(Streaming), -} - -// ===== impl Receiver ===== - -impl> Receiver { - pub fn new(future: ResponseFuture) -> Self { - Receiver { - rx: Rx::Waiting(future), - } - } -} - -impl> Stream for Receiver { - type Item = M; - type Error = grpc::Status; - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - let stream = match self.rx { - Rx::Waiting(ref mut future) => try_ready!(future.poll()).into_inner(), - - Rx::Streaming(ref mut stream) => { - return stream.poll(); - } - }; - - self.rx = Rx::Streaming(stream); - } - } -} diff --git a/lib/linkerd2-request-filter/Cargo.toml b/lib/linkerd2-request-filter/Cargo.toml new file mode 100644 index 0000000..2eaff98 --- /dev/null +++ b/lib/linkerd2-request-filter/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "linkerd2-request-filter" +version = "0.1.0" +authors = ["Linkerd Developers "] +edition = "2018" +publish = false + +[dependencies] +futures = "0.1" +tower = "0.1" +tracing = "0.1" diff --git a/lib/linkerd2-request-filter/src/lib.rs b/lib/linkerd2-request-filter/src/lib.rs new file mode 100644 index 0000000..8a9e738 --- /dev/null +++ b/lib/linkerd2-request-filter/src/lib.rs @@ -0,0 +1,83 @@ +//! A `Service` middleware that applies arbitrary-user provided logic to each +//! target before it is issued to an inner service. + +use futures::{Future, Poll}; + +pub type Error = Box; + +pub trait RequestFilter { + type Error: Into; + + fn filter(&self, request: T) -> Result; +} + +#[derive(Clone, Debug)] +pub struct Service { + filter: I, + service: S, +} + +#[derive(Debug)] +pub enum ResponseFuture { + Future(F), + Rejected(Option), +} + +// === impl Service === + +impl Service { + pub fn new(filter: I, service: S) -> Self + where + Self: tower::Service, + { + Self { filter, service } + } +} + +impl tower::Service for Service +where + I: RequestFilter, + S: tower::Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = Error; + type Future = ResponseFuture; + + #[inline] + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready().map_err(Into::into) + } + + fn call(&mut self, request: T) -> Self::Future { + match self.filter.filter(request) { + Ok(req) => { + tracing::trace!("accepted"); + let f = self.service.call(req); + ResponseFuture::Future(f) + } + Err(e) => { + tracing::trace!("rejected"); + ResponseFuture::Rejected(Some(e.into())) + } + } + } +} + +// === impl ResponseFuture === + +impl Future for ResponseFuture +where + F: Future, + F::Error: Into, +{ + type Item = F::Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + match self { + ResponseFuture::Future(ref mut f) => f.poll().map_err(Into::into), + ResponseFuture::Rejected(ref mut e) => Err(e.take().unwrap()), + } + } +} diff --git a/src/app/main.rs b/src/app/main.rs index bcc98d9..4ce0e72 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -338,11 +338,9 @@ where .make(config.destination_addr.clone()) }; - let resolver = crate::resolve::Resolver::new( - dst_svc.clone(), - config.destination_get_suffixes.clone(), - config.destination_context.clone(), - ); + let resolver = crate::api_resolve::Resolve::new(dst_svc.clone()) + .with_scheme("k8s") + .with_context_token(&config.destination_context); let (tap_layer, tap_grpc, tap_daemon) = tap::new(); @@ -409,7 +407,11 @@ where &config, local_identity.clone(), outbound_listener.local_addr(), - resolver, + outbound::resolve( + config.destination_get_suffixes.clone(), + config.control_backoff.clone(), + resolver, + ), dns_resolver, profiles_client.clone(), tap_layer.clone(), diff --git a/src/app/outbound/discovery.rs b/src/app/outbound/discovery.rs deleted file mode 100644 index 0056554..0000000 --- a/src/app/outbound/discovery.rs +++ /dev/null @@ -1,143 +0,0 @@ -use super::super::dst::DstAddr; -use super::Endpoint; -use crate::core::resolve; -use crate::proxy::http::settings; -use crate::resolve::{Metadata, Unresolvable}; -use crate::transport::tls; -use crate::{Addr, Conditional, NameAddr}; -use futures::{future::Future, try_ready, Async, Poll}; -use tracing::debug; - -#[derive(Clone, Debug)] -pub struct Resolve>(R); - -#[derive(Debug)] -pub struct Resolution { - resolving: Resolving, - http_settings: settings::Settings, -} - -#[derive(Debug)] -enum Resolving { - Name { - dst_logical: Option, - dst_concrete: NameAddr, - resolution: R, - }, - Unresolvable, -} - -// === impl Resolve === - -impl Resolve -where - R: resolve::Resolve, -{ - pub fn new(resolve: R) -> Self { - Resolve(resolve) - } -} - -impl resolve::Resolve for Resolve -where - R: resolve::Resolve, - R::Future: Future, -{ - type Endpoint = Endpoint; - type Future = Resolution; - type Resolution = Resolution; - - fn resolve(&self, dst: &DstAddr) -> Self::Future { - let resolving = match dst.dst_concrete() { - Addr::Name(ref name) => Resolving::Name { - dst_logical: dst.dst_logical().name_addr().cloned(), - dst_concrete: name.clone(), - resolution: self.0.resolve(&name), - }, - Addr::Socket(_) => Resolving::Unresolvable, - }; - - Resolution { - http_settings: dst.http_settings, - resolving, - } - } -} - -// === impl Resolution === - -impl Future for Resolution -where - F: Future, -{ - type Item = Resolution; - type Error = F::Error; - fn poll(&mut self) -> Poll { - let resolving = match self.resolving { - Resolving::Name { - ref dst_logical, - ref dst_concrete, - ref mut resolution, - } => { - let res = try_ready!(resolution.poll()); - // TODO: get rid of unnecessary arc bumps? - Resolving::Name { - dst_logical: dst_logical.clone(), - dst_concrete: dst_concrete.clone(), - resolution: res, - } - } - Resolving::Unresolvable => return Err(Unresolvable::new()), - }; - Ok(Async::Ready(Resolution { - resolving, - // TODO: get rid of unnecessary clone - http_settings: self.http_settings.clone(), - })) - } -} - -impl resolve::Resolution for Resolution -where - R: resolve::Resolution, -{ - type Endpoint = Endpoint; - type Error = R::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.resolving { - Resolving::Name { - ref dst_logical, - ref dst_concrete, - ref mut resolution, - } => match try_ready!(resolution.poll()) { - resolve::Update::Remove(addr) => { - debug!("removing {}", addr); - Ok(Async::Ready(resolve::Update::Remove(addr))) - } - resolve::Update::Add(addr, metadata) => { - let identity = metadata - .identity() - .cloned() - .map(Conditional::Some) - .unwrap_or_else(|| { - Conditional::None( - tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(), - ) - }); - debug!("adding addr={}; identity={:?}", addr, identity); - let ep = Endpoint { - dst_logical: dst_logical.clone(), - dst_concrete: Some(dst_concrete.clone()), - addr, - identity, - metadata, - http_settings: self.http_settings, - }; - Ok(Async::Ready(resolve::Update::Add(addr, ep))) - } - }, - Resolving::Unresolvable => unreachable!("unresolvable endpoints have no resolutions"), - } - } -} diff --git a/src/app/outbound/endpoint.rs b/src/app/outbound/endpoint.rs index 0d40581..79130e3 100644 --- a/src/app/outbound/endpoint.rs +++ b/src/app/outbound/endpoint.rs @@ -1,11 +1,13 @@ -use super::super::{dst::Route, L5D_REQUIRE_ID}; +use crate::api_resolve::{Metadata, ProtocolHint}; +use crate::app::dst::{DstAddr, Route}; +use crate::app::L5D_REQUIRE_ID; use crate::proxy::http::{identity_from_header, settings}; use crate::proxy::Source; -use crate::resolve::{Metadata, ProtocolHint}; use crate::transport::{connect, tls}; use crate::{identity, tap}; use crate::{Conditional, NameAddr}; use indexmap::IndexMap; +use linkerd2_proxy_resolve::map_endpoint::MapEndpoint; use std::net::SocketAddr; use std::sync::Arc; @@ -19,6 +21,9 @@ pub struct Endpoint { pub http_settings: settings::Settings, } +#[derive(Copy, Clone, Debug)] +pub struct FromMetadata; + impl Endpoint { pub fn can_use_orig_proto(&self) -> bool { match self.metadata.protocol_hint() { @@ -146,3 +151,25 @@ impl tap::Inspect for Endpoint { true } } + +impl MapEndpoint for FromMetadata { + type Out = Endpoint; + + fn map_endpoint(&self, target: &DstAddr, addr: SocketAddr, metadata: Metadata) -> Endpoint { + let identity = metadata + .identity() + .cloned() + .map(Conditional::Some) + .unwrap_or_else(|| { + Conditional::None(tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into()) + }); + Endpoint { + addr, + identity, + metadata, + dst_logical: target.dst_logical().name_addr().cloned(), + dst_concrete: target.dst_concrete().name_addr().cloned(), + http_settings: target.http_settings.clone(), + } + } +} diff --git a/src/app/outbound/mod.rs b/src/app/outbound/mod.rs index 5575cb2..c451452 100644 --- a/src/app/outbound/mod.rs +++ b/src/app/outbound/mod.rs @@ -1,15 +1,15 @@ use super::{classify, config::Config, dst::DstAddr, identity, DispatchDeadline}; use crate::core::listen::ServeConnection; -use crate::core::resolve::{Resolution, Resolve}; +use crate::core::resolve::Resolve; use crate::proxy::http::{ balance, canonicalize, client, fallback, header_from_target, insert, metrics as http_metrics, normalize_uri, profiles, retry, router, settings, strip_header, }; -use crate::proxy::{self, accept, resolve, Server}; -use crate::resolve::{Metadata, Unresolvable}; +use crate::proxy::{self, accept, Server}; use crate::transport::Connection; use crate::transport::{self, connect, keepalive, tls}; -use crate::{svc, Addr, NameAddr}; +use crate::{svc, Addr}; +use linkerd2_proxy_discover as discover; use linkerd2_reconnect as reconnect; use std::net::SocketAddr; use std::time::Duration; @@ -20,13 +20,14 @@ use tracing::debug; mod add_remote_ip_on_rsp; #[allow(dead_code)] // TODO #2597 mod add_server_id_on_rsp; -mod discovery; mod endpoint; mod orig_proto_upgrade; mod require_identity_on_endpoint; +mod resolve; pub(super) use self::endpoint::Endpoint; pub(super) use self::require_identity_on_endpoint::RequireIdentityError; +pub(super) use self::resolve::resolve; const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30); const EWMA_DECAY: Duration = Duration::from_secs(10); @@ -46,10 +47,9 @@ pub fn server( transport_metrics: transport::metrics::Registry, ) -> impl ServeConnection where - R: Resolve + Clone + Send + Sync + 'static, - R::Future: futures::Future + Send, + R: Resolve + Clone + Send + Sync + 'static, + R::Future: Send, R::Resolution: Send, - ::Error: std::error::Error + Send + Sync + 'static, P: GrpcService + Clone + Send + Sync + 'static, P::ResponseBody: Send, ::Data: Send, @@ -149,18 +149,20 @@ where // Resolves the target via the control plane and balances requests // over all endpoints returned from the destination service. + const DISCOVER_UPDATE_BUFFER_CAPACITY: usize = 2; let balancer_layer = svc::layers() .push_spawn_ready() - .push(resolve::layer(discovery::Resolve::new(resolve))) + .push(discover::Layer::new( + DISCOVER_UPDATE_BUFFER_CAPACITY, + resolve, + )) .push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)); + // If the balancer fails to be created, i.e., because it is unresolvable, + // fall back to using a router that dispatches request to the + // application-selected original destination. let distributor = endpoint_stack - .push( - // Attempt to build a balancer. If the service is - // unresolvable, fall back to using a router that dispatches - // request to the application-selected original destination. - fallback::layer(balancer_layer, orig_dst_router_layer).on_error::(), - ) + .push(fallback::layer(balancer_layer, orig_dst_router_layer)) .serves::(); // A per-`DstAddr` stack that does the following: diff --git a/src/app/outbound/resolve.rs b/src/app/outbound/resolve.rs new file mode 100644 index 0000000..a6dff41 --- /dev/null +++ b/src/app/outbound/resolve.rs @@ -0,0 +1,112 @@ +use super::endpoint; +use crate::api_resolve::Metadata; +use crate::app::dst::DstAddr; +use crate::dns::Suffix; +use indexmap::IndexSet; +use linkerd2_error::{Error, Recover}; +use linkerd2_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; +use linkerd2_proxy_core::{resolve, Resolve}; +use linkerd2_proxy_resolve::{map_endpoint, recover}; +use linkerd2_request_filter as request_filter; +use std::sync::Arc; +use tower_grpc as grpc; + +pub fn resolve( + suffixes: S, + backoff: ExponentialBackoff, + resolve: R, +) -> map_endpoint::Resolve< + endpoint::FromMetadata, + request_filter::Service< + PermitNamesInSuffixes, + resolve::Service>, + >, +> +where + R: Resolve + Clone, + S: IntoIterator, +{ + map_endpoint::Resolve::new( + endpoint::FromMetadata, + request_filter::Service::new( + suffixes.into(), + recover::Resolve::new(backoff.into(), resolve).into_service(), + ), + ) +} + +#[derive(Clone, Debug)] +pub struct PermitNamesInSuffixes { + permitted: Arc>, +} + +#[derive(Clone, Debug, Default)] +pub struct BackoffUnlessInvalidArgument(ExponentialBackoff); + +#[derive(Debug)] +pub struct Unresolvable(()); + +// === impl PermitNamesInSuffixes === + +impl> From for PermitNamesInSuffixes { + fn from(permitted: I) -> Self { + Self { + permitted: Arc::new(permitted.into_iter().collect()), + } + } +} + +impl request_filter::RequestFilter for PermitNamesInSuffixes { + type Error = Unresolvable; + + fn filter(&self, dst: DstAddr) -> Result { + if let Some(name) = dst.dst_concrete().name_addr() { + if self + .permitted + .iter() + .any(|suffix| suffix.contains(name.name())) + { + tracing::debug!("suffix matches"); + return Ok(dst); + } + } + + Err(Unresolvable(())) + } +} + +// === impl Unresolvable === + +impl std::fmt::Display for Unresolvable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "unresolvable") + } +} + +impl std::error::Error for Unresolvable {} + +// === impl BackoffUnlessInvalidArgument === + +impl From for BackoffUnlessInvalidArgument { + fn from(eb: ExponentialBackoff) -> Self { + BackoffUnlessInvalidArgument(eb) + } +} + +impl Recover for BackoffUnlessInvalidArgument { + type Backoff = ExponentialBackoffStream; + type Error = ::Error; + + fn recover(&self, err: Error) -> Result { + match err.downcast::() { + Ok(ref status) if status.code() == grpc::Code::InvalidArgument => { + tracing::debug!(message = "cannot recover", %status); + return Err(Unresolvable(()).into()); + } + Ok(status) => tracing::debug!(message = "recovering", %status), + Err(error) => tracing::debug!(message = "recovering", %error), + } + + Ok(self.0.stream()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 1b3aa1e..a30fc27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,8 +7,8 @@ use linkerd2_error::{Error, Never}; use linkerd2_identity as identity; use linkerd2_metrics as metrics; use linkerd2_proxy_api as api; +use linkerd2_proxy_api_resolve as api_resolve; use linkerd2_proxy_core::{self as core, drain}; -use linkerd2_proxy_resolve as resolve; use linkerd2_task as task; pub mod app; diff --git a/src/proxy/http/profiles/router.rs b/src/proxy/http/profiles/router.rs index 88283fc..638d030 100644 --- a/src/proxy/http/profiles/router.rs +++ b/src/proxy/http/profiles/router.rs @@ -133,24 +133,23 @@ where } } -impl - svc::Service for MakeSvc +impl svc::Service + for MakeSvc where + G: GetRoutes, Target: CanGetDestination + WithRoute + WithAddr + Eq + Hash + Clone, ::Output: Eq + Hash + Clone, - Inner: rt::Make + Clone, - InnerSvc: svc::Service> + Clone, - InnerSvc::Error: Into, - G: GetRoutes, - RouteLayer: svc::Layer< - svc::shared::Shared>, - Service = RouteMake, - > + Clone, - RouteMake: rt::Make<::Output, Value = RouteSvc> + Clone, + Inner: rt::Make + Clone, + Inner::Value: svc::Service> + Clone, + >>::Error: Into, + RouteLayer: + svc::Layer>> + Clone, + RouteLayer::Service: rt::Make<::Output, Value = RouteSvc> + Clone, RouteSvc: svc::Service> + Clone, RouteSvc::Error: Into, { - type Response = Service; + type Response = + Service; type Error = Never; type Future = futures::future::FutureResult; @@ -169,19 +168,20 @@ where rt::Router::new_fixed(rec, make) }; - let stack = self.route_layer.layer(svc::shared(concrete_router.clone())); + let concrete_stack = self.route_layer.layer(svc::shared(concrete_router.clone())); // Initially there are no routes, so build a route router with only // the default route. - let default_route = target.clone().with_route(self.default_route.clone()); + let router = { + let default_route = target.clone().with_route(self.default_route.clone()); + let stack = rt::Make::make(&concrete_stack, &default_route); - let mut make = IndexMap::with_capacity(1); - make.insert(default_route.clone(), stack.make(&default_route)); + let mut make = IndexMap::with_capacity(1); + make.insert(default_route.clone(), stack); - let router = rt::Router::new_fixed( - RouteRecognize::new(target.clone(), Vec::new(), self.default_route.clone()), - make, - ); + let recognize = RouteRecognize::new(target.clone(), vec![], self.default_route.clone()); + rt::Router::new_fixed(recognize, make) + }; // Initiate a stream to get route and dst_override updates for this // destination. diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 893389c..a99ed5a 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -6,7 +6,6 @@ pub mod grpc; pub mod http; pub mod pending; mod protocol; -pub mod resolve; pub mod server; mod tcp; diff --git a/src/svc.rs b/src/svc.rs index 03f9b5e..e7a27e9 100644 --- a/src/svc.rs +++ b/src/svc.rs @@ -1,5 +1,6 @@ use crate::proxy::{buffer, pending}; -pub use linkerd2_stack::{self as stack, layer, shared, Layer, LayerExt}; +pub use linkerd2_router::Make; +pub use linkerd2_stack::{self as stack, layer, map_target, shared, Layer, LayerExt}; pub use linkerd2_timeout::stack as timeout; use std::time::Duration; use tower::layer::util::{Identity, Stack as Pair}; @@ -103,6 +104,14 @@ impl Stack { self.push(TimeoutLayer::new(timeout)) } + /// Validates that this stack serves T-typed targets. + pub fn makes(self) -> Self + where + S: Make, + { + self + } + /// Validates that this stack serves T-typed targets. pub fn serves(self) -> Self where