Skip to content

Commit

Permalink
control: Build the identity client each time its used (#1021)
Browse files Browse the repository at this point in the history
We currently build the identity client at startup and hold it to be used
fairly infrequently: the client is used once per day by default, and
even in extremely aggressive scenarios is unlikely to be used more
frequently than once every few minutes. As such, there's very little
benefit to holding the client (and buffers, DNS resolutions, etc) open
continually. Instead, it seems preferable to instantiate the identity
client only as it's needed.

Practically, we see issues like linkerd/linkerd2#6184 where the identity
client may try to reconnect to stale endpoints when the identity
deployment is rescheduled (because there aren't a steady stream of
requests on this client).

This change makes the controller stack a `NewService<()>` so that
clients can be instantiated lazily. The identity module now creates a
new connection for each identity request. Other controller clients are
unaffacted, continuing to use long-live clients.
  • Loading branch information
olix0r committed Jun 1, 2021
1 parent 99e3c06 commit cd6da0f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 37 deletions.
30 changes: 14 additions & 16 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use crate::{
classify, config, control, dns, metrics,
proxy::http,
reconnect,
svc::{self, NewService, Param},
tls,
transport::ConnectTcp,
Addr, Error,
classify, config, control, dns, metrics, proxy::http, reconnect, svc, tls,
transport::ConnectTcp, Addr, Error,
};
use futures::future::Either;
use std::fmt;
Expand All @@ -26,7 +21,7 @@ pub struct ControlAddr {
pub identity: tls::ConditionalClientTls,
}

impl Param<Addr> for ControlAddr {
impl svc::Param<Addr> for ControlAddr {
fn param(&self) -> Addr {
self.addr.clone()
}
Expand All @@ -51,13 +46,15 @@ impl Config {
dns: dns::Resolver,
metrics: metrics::ControlHttp,
identity: Option<L>,
) -> Client<B>
) -> svc::BoxNewService<(), Client<B>>
where
B: http::HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync + 'static,
L: Clone + Param<tls::client::Config> + Send + 'static,
L: Clone + svc::Param<tls::client::Config> + Send + Sync + 'static,
{
let addr = self.addr;

let connect_backoff = {
let backoff = self.connect.backoff;
move |_| Ok(backoff.stream())
Expand Down Expand Up @@ -102,7 +99,9 @@ impl Config {
.push(metrics.to_layer::<classify::Response, _>())
.push(self::add_origin::layer())
.push_on_response(svc::layers().push_spawn_buffer(self.buffer_capacity))
.new_service(self.addr)
.push_map_target(move |()| addr.clone())
.push(svc::BoxNewService::layer())
.into_inner()
}
}

Expand Down Expand Up @@ -234,8 +233,7 @@ mod balance {
mod client {
use crate::{
proxy::http,
svc::{self, Param},
tls,
svc, tls,
transport::{Remote, ServerAddr},
};
use linkerd_proxy_http::h2::Settings as H2Settings;
Expand Down Expand Up @@ -263,19 +261,19 @@ mod client {

// === impl Target ===

impl Param<Remote<ServerAddr>> for Target {
impl svc::Param<Remote<ServerAddr>> for Target {
fn param(&self) -> Remote<ServerAddr> {
Remote(ServerAddr(self.addr))
}
}

impl Param<SocketAddr> for Target {
impl svc::Param<SocketAddr> for Target {
fn param(&self) -> SocketAddr {
self.addr
}
}

impl Param<tls::ConditionalClientTls> for Target {
impl svc::Param<tls::ConditionalClientTls> for Target {
fn param(&self) -> tls::ConditionalClientTls {
self.server_id.clone()
}
Expand Down
3 changes: 2 additions & 1 deletion linkerd/app/src/dst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use linkerd_app_core::{
metrics,
profiles::{self, DiscoveryRejected},
proxy::{api_resolve as api, identity::LocalCrtKey, resolve::recover},
svc::NewService,
Error, Recover,
};
use tonic::body::BoxBody;
Expand Down Expand Up @@ -41,7 +42,7 @@ impl Config {
) -> Result<Dst, Error> {
let addr = self.control.addr.clone();
let backoff = BackoffUnlessInvalidArgument(self.control.connect.backoff);
let svc = self.control.build(dns, metrics, identity);
let svc = self.control.build(dns, metrics, identity).new_service(());

Ok(Dst {
addr,
Expand Down
11 changes: 6 additions & 5 deletions linkerd/app/src/oc_collector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::{dns, identity::LocalCrtKey};
use linkerd_app_core::{control, metrics::ControlHttp as HttpMetrics, Error};
use linkerd_app_core::{control, metrics::ControlHttp as HttpMetrics, svc::NewService, Error};
use linkerd_opencensus::{self as opencensus, metrics, proto};
use std::future::Future;
use std::pin::Pin;
use std::{collections::HashMap, time::SystemTime};
use std::{collections::HashMap, future::Future, pin::Pin, time::SystemTime};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::Instrument;
Expand Down Expand Up @@ -51,7 +49,10 @@ impl Config {
Config::Disabled => Ok(OcCollector::Disabled),
Config::Enabled(inner) => {
let addr = inner.control.addr.clone();
let svc = inner.control.build(dns, client_metrics, identity);
let svc = inner
.control
.build(dns, client_metrics, identity)
.new_service(());

let (span_sink, spans_rx) = mpsc::channel(Self::SPAN_BUFFER_CAPACITY);
let spans_rx = ReceiverStream::new(spans_rx);
Expand Down
37 changes: 22 additions & 15 deletions linkerd/proxy/identity/src/certify.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use http_body::Body as HttpBody;
use linkerd2_proxy_api::identity as api;
use linkerd2_proxy_api::identity::{self as api, identity_client::IdentityClient};
use linkerd_error::Error;
use linkerd_identity as id;
use linkerd_metrics::Counter;
use linkerd_stack::Param;
use linkerd_stack::{NewService, Param};
use linkerd_tls as tls;
use pin_project::pin_project;
use std::convert::TryFrom;
Expand Down Expand Up @@ -84,12 +84,13 @@ impl Config {
// === impl Daemon ===

impl Daemon {
pub async fn run<T>(self, client: T)
pub async fn run<N, S>(self, mut new_client: N)
where
T: GrpcService<BoxBody>,
T::ResponseBody: Send + 'static,
<T::ResponseBody as Body>::Data: Send,
<T::ResponseBody as HttpBody>::Error: Into<Error> + Send,
N: NewService<(), Service = S>,
S: GrpcService<BoxBody>,
S::ResponseBody: Send + 'static,
<S::ResponseBody as Body>::Data: Send,
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
{
let Self {
crt_key_watch,
Expand All @@ -99,18 +100,24 @@ impl Daemon {

debug!("Identity daemon running");
let mut curr_expiry = UNIX_EPOCH;
let mut client = api::identity_client::IdentityClient::new(client);

loop {
match config.token.load() {
Ok(token) => {
let req = grpc::Request::new(api::CertifyRequest {
token,
identity: config.local_id.to_string(),
certificate_signing_request: config.csr.to_vec(),
});
trace!("daemon certifying");
let rsp = client.certify(req).await;
let rsp = {
// The client is used for infrequent communication with the identity controller;
// so clients are instantiated on-demand rather than held.
let mut client = IdentityClient::new(new_client.new_service(()));

trace!("daemon certifying");
let req = grpc::Request::new(api::CertifyRequest {
token,
identity: config.local_id.to_string(),
certificate_signing_request: config.csr.to_vec(),
});
client.certify(req).await
};

match rsp {
Err(e) => error!("Failed to certify identity: {}", e),
Ok(rsp) => {
Expand Down

0 comments on commit cd6da0f

Please sign in to comment.