Skip to content

Commit

Permalink
ready-cache: Add endpoint-level debugging
Browse files Browse the repository at this point in the history
linkerd/linkerd2#6086 describes an issue that sounds closely related to
tower-rs#415: There's some sort of consistency issue between the
ready-cache's pending stream and its set of cancelations. Where the
latter issues describes triggering a panic in the stream receiver, the
former describes triggering a panic in the stream implementation.

There's no logical reason why we can't continue to operate in this
scenario, though it does indicate a real correctness issue.

So, this change prevents panicking in this scenario when not building
with debugging. Instead, we now emit WARN-level logs so that we have a
clearer signal they're occurring.

Finally, this change also adds `Debug` constraints to the cache's key
types (and hence the balancer's key types) so that we can more
reasonably debug this behavior.
  • Loading branch information
olix0r committed Jun 16, 2021
1 parent 7776019 commit c8190b4
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
4 changes: 2 additions & 2 deletions tower/src/balance/p2c/service.rs
Expand Up @@ -117,7 +117,7 @@ where
impl<D, Req> Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Hash + Clone,
D::Key: Hash + Clone + std::fmt::Debug,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
Expand Down Expand Up @@ -223,7 +223,7 @@ where
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Hash + Clone,
D::Key: Hash + Clone + std::fmt::Debug,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
Expand Down
69 changes: 49 additions & 20 deletions tower/src/ready_cache/cache.rs
Expand Up @@ -11,7 +11,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};

/// Drives readiness over a set of services.
///
Expand Down Expand Up @@ -173,24 +173,31 @@ where
/// Services are dropped from the ready set immediately. Services in the
/// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
/// must be called to cause the service to be dropped.
pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
pub fn evict<Q>(&mut self, key: &Q) -> bool
where
Q: Hash + Equivalent<K> + std::fmt::Debug,
{
let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
debug!(?key, "Evicting pending service");
c.send(()).expect("cancel receiver lost");
true
} else {
false
};

self.ready
.swap_remove_full(key)
.map(|_| true)
.swap_remove(key)
.map(|_| {
debug!(?key, "Evicted ready service");
true
})
.unwrap_or(canceled)
}
}

impl<K, S, Req> ReadyCache<K, S, Req>
where
K: Clone + Eq + Hash,
K: Clone + Eq + Hash + std::fmt::Debug,
S: Service<Req>,
<S as Service<Req>>::Error: Into<crate::BoxError>,
S::Error: Into<crate::BoxError>,
Expand All @@ -206,15 +213,19 @@ where
///
/// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
pub fn push(&mut self, key: K, svc: S) {
debug!(?key, "Adding service");
let cancel = oneshot::channel();
self.push_pending(key, svc, cancel);
}

fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
debug!(?key, "Canceling pending service for replacement");
// If there is already a service for this key, cancel it.
c.send(()).expect("cancel receiver lost");
}

trace!(?key, "Adding pending service");
self.pending.push(Pending {
key: Some(key),
cancel: Some(cancel_rx),
Expand Down Expand Up @@ -243,12 +254,10 @@ where
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
trace!("endpoint ready");
trace!(?key, "Endpoint became ready");
let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
if let Some(cancel_tx) = cancel_tx {
// Keep track of the cancelation so that it need not be
// recreated after the service is used.
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
let (cancel_tx, cancel_rx) = if let Some(cancel_tx) = cancel_tx {
(cancel_tx, cancel_rx)
} else {
// This should not technically be possible. We must have decided to cancel
// a Service (by sending on the CancelTx), yet that same service then
Expand All @@ -260,27 +269,29 @@ where
// So, we instead detect the endpoint as canceled at this point. That
// should be fine, since the oneshot is only really there to ensure that
// the Pending is polled again anyway.
//
warn!(?key, "Ready service had no associated cancelation");
// We assert that this can't happen in debug mode so that hopefully one day
// we can find a test that triggers this reliably.
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed when ready");
}
oneshot::channel()
};
// Keep track of the cancelation so that it need not be recreated after the
// service is used.
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
}
Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
debug!("endpoint canceled");
Poll::Ready(Some(Err(PendingError::Canceled(key)))) => {
debug!(?key, "Endpoint canceled");
// The cancellation for this service was removed in order to
// cause this cancellation.
}
Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
if cancel_tx.is_some() {
return Err(error::Failed(key, e.into())).into();
} else {
if cancel_tx.is_none() {
// See comment for the same clause under Ready(Some(Ok)).
warn!(?key, "Failed service had no associated cancelation");
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed on error");
}
return Err(error::Failed(key, e.into())).into();
}
}
}
Expand Down Expand Up @@ -324,11 +335,14 @@ where
.ready
.swap_remove_index(index)
.expect("invalid ready index");
debug!(?key, "Ready service became pending");

// If a new version of this service has been added to the
// unready set, don't overwrite it.
if !self.pending_contains(&key) {
self.push_pending(key, svc, cancel);
} else {
debug!(?key, "Service already has a pending replacement");
}

Ok(false)
Expand Down Expand Up @@ -367,13 +381,16 @@ where
.ready
.swap_remove_index(index)
.expect("check_ready_index was not called");
debug!(?key, "Issuing request to ready service");

let fut = svc.call(req);

// If a new version of this service has been added to the
// unready set, don't overwrite it.
if !self.pending_contains(&key) {
self.push_pending(key, svc, cancel);
} else {
debug!(?key, "Service already has a pending replacement");
}

fut
Expand All @@ -387,15 +404,27 @@ impl<K, S, Req> Unpin for Pending<K, S, Req> {}

impl<K, S, Req> Future for Pending<K, S, Req>
where
K: std::fmt::Debug,
S: Service<Req>,
{
type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut fut = self.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
let key = self.key.take().expect("polled after complete");
if r.is_err() {
// XXX Similar to the scenario described in `ReadyCache::poll_pending`, it should
// not be possible for this future to be polled when the cancelation has been
// dropped (without it being notified). But, this scenario appears to occur in the
// wild.
//
// This isn't technically a problem, though--there's no reason we can't continue to
// fail this future as if it were canceled. It's just deeply weird. So, for now, we
// log a warning and panic in debug mode.
warn!(?key, "Pending service lost its cancelation");
debug_assert!(r.is_ok(), "cancel sender lost");
}
return Err(PendingError::Canceled(key)).into();
}

Expand Down

0 comments on commit c8190b4

Please sign in to comment.