Skip to content

Commit

Permalink
Merge branch 'ver/2.161/opaque' into release/v2.161
Browse files Browse the repository at this point in the history
Signed-off-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
olix0r committed Apr 19, 2022
2 parents 53ee265 + c6d79c9 commit 42c5d8a
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 9 deletions.
9 changes: 7 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ pub struct FromMetadata {
// === impl Endpoint ===

impl Endpoint<()> {
pub(crate) fn forward(addr: OrigDstAddr, reason: tls::NoClientTls) -> Self {
pub(crate) fn forward(
addr: OrigDstAddr,
reason: tls::NoClientTls,
opaque_protocol: bool,
) -> Self {
Self {
addr: Remote(ServerAddr(addr.into())),
metadata: Metadata::default(),
tls: Conditional::None(reason),
logical_addr: None,
opaque_protocol: false,
opaque_protocol,
protocol: (),
}
}
Expand Down Expand Up @@ -269,6 +273,7 @@ pub mod tests {
.new_service(tcp::Endpoint::forward(
OrigDstAddr(addr),
tls::NoClientTls::Disabled,
false,
));

let (client_io, server_io) = support::io::duplex(4096);
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/http/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ impl<N> Outbound<N> {
// detection and just use the TCP stack directly.
|target: T| -> Result<_, Infallible> {
if let Some(Skip) = target.param() {
tracing::debug!("Skipping HTTP protocol detection");
return Ok(svc::Either::B(target));
}
tracing::debug!("Attempting HTTP protocol detection");
Ok(svc::Either::A(target))
},
skipped,
Expand Down
63 changes: 57 additions & 6 deletions linkerd/app/outbound/src/switch_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,48 @@ impl<S> Outbound<S> {
.push_switch(
move |(profile, target): (Option<profiles::Receiver>, T)| -> Result<_, Infallible> {
if let Some(rx) = profile {
// If the profile provides an endpoint, then the target is single endpoint and
// not a logical/load-balanced service.
let is_opaque = rx.is_opaque_protocol();

// If the profile provides an endpoint, then the target is single
// endpoint and not a logical/load-balanced service.
if let Some((addr, metadata)) = rx.endpoint() {
tracing::debug!(%is_opaque, "Profile describes an endpoint");
return Ok(svc::Either::A(Endpoint::from_metadata(
addr,
metadata,
no_tls_reason,
rx.is_opaque_protocol(),
is_opaque,
&*inbound_ips,
)));
}

// Otherwise, if the profile provides a (named) logical address, then we build a
// If the profile provides a (named) logical address, then we build a
// logical stack so we apply routes, traffic splits, and load balancing.
if let Some(logical_addr) = rx.logical_addr() {
tracing::debug!("Profile describes a logical service");
return Ok(svc::Either::B(Logical::new(logical_addr, rx)));
}

// Otherwise, if there was a profile but it didn't include an endpoint or logical
// address, create a bare endpoint from the original destination address
// using the profile-provided opaqueness. This applies for targets that
// aren't known by the destination controller that may target ports
// included in the cluster-wide default opaque list.
tracing::debug!("Unknown endpoint");
return Ok(svc::Either::A(Endpoint::forward(
target.param(),
no_tls_reason,
is_opaque,
)));
}

// If there was no profile or it didn't include any useful metadata, create a bare
// endpoint from the original destination address.
// If there was no profile, create a bare endpoint from the original
// destination address.
tracing::debug!("No profile");
Ok(svc::Either::A(Endpoint::forward(
target.param(),
no_tls_reason,
false,
)))
},
logical,
Expand Down Expand Up @@ -171,4 +189,37 @@ mod tests {
let (server_io, _client_io) = io::duplex(1);
svc.oneshot(server_io).await.expect("service must succeed");
}

#[tokio::test(flavor = "current_thread")]
async fn profile_neither() {
let _trace = linkerd_tracing::test::trace_init();

let endpoint_addr = SocketAddr::new([192, 0, 2, 20].into(), 2020);
let endpoint = {
let endpoint_addr = endpoint_addr.clone();
move |ep: tcp::Endpoint| {
assert_eq!(ep.addr.as_ref(), &endpoint_addr);
assert!(ep.opaque_protocol, "protocol must be marked opaque");
svc::mk(|_: io::DuplexStream| future::ok::<(), Error>(()))
}
};

let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
.with_stack(endpoint)
.push_switch_logical(svc::Fail::<_, WrongStack>::default())
.into_inner();

let (_tx, profile) = tokio::sync::watch::channel(profiles::Profile {
endpoint: None,
opaque_protocol: true,
addr: None,
..Default::default()
});

let orig_dst = OrigDstAddr(endpoint_addr);
let svc = stack.new_service((Some(profile.into()), orig_dst));
let (server_io, _client_io) = io::duplex(1);
svc.oneshot(server_io).await.expect("service must succeed");
}
}
4 changes: 3 additions & 1 deletion linkerd/service-profiles/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
task::{Context, Poll},
};
use tonic::{body::BoxBody, client::GrpcService};
use tracing::debug;
use tracing::{debug, trace};

/// Creates watches on service profiles.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -73,7 +73,9 @@ where
Box::pin(async move {
match w.spawn_watch(addr).await {
Ok(rsp) => {
debug!("Resolved profile");
let rx = rsp.into_inner();
trace!(profile = ?rx.borrow());
Ok(Some(rx.into()))
}
Err(status) => {
Expand Down

0 comments on commit 42c5d8a

Please sign in to comment.