From 1729b165bb06c021f631cdc332b8bcbe244d4902 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 16 Jun 2021 01:26:41 +0000 Subject: [PATCH] ready-cache: Add endpoint-level debugging linkerd/linkerd2#6086 describes an issue that sounds closely related to tower-rs/tower#415: There's some sort of consistency issue between the ready-cache's pending stream and its set of cancelations. Where the latter issues describes triggering a panic in the stream receiver, the former describes triggering a panic in the stream implementation. There's no logical reason why we can't continue to operate in this scenario, though it does indicate a real correctness issue. So, this change prevents panicking in this scenario when not building with debugging. Instead, we now emit WARN-level logs so that we have a clearer signal they're occurring. Finally, this change also adds `Debug` constraints to the cache's key types (and hence the balancer's key types) so that we can more reasonably debug this behavior. --- tower/src/balance/p2c/service.rs | 4 +- tower/src/ready_cache/cache.rs | 69 +++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index 97cccb919..eb5df0675 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -112,7 +112,7 @@ where impl Balance where D: Discover + Unpin, - D::Key: Hash + Clone, + D::Key: Hash + Clone + std::fmt::Debug, D::Error: Into, D::Service: Service + Load, ::Metric: std::fmt::Debug, @@ -218,7 +218,7 @@ where impl Service for Balance where D: Discover + Unpin, - D::Key: Hash + Clone, + D::Key: Hash + Clone + std::fmt::Debug, D::Error: Into, D::Service: Service + Load, ::Metric: std::fmt::Debug, diff --git a/tower/src/ready_cache/cache.rs b/tower/src/ready_cache/cache.rs index f1fb692a0..172ad6a39 100644 --- a/tower/src/ready_cache/cache.rs +++ b/tower/src/ready_cache/cache.rs @@ -11,7 +11,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::oneshot; use tower_service::Service; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; /// Drives readiness over a set of services. /// @@ -168,8 +168,12 @@ where /// Services are dropped from the ready set immediately. Services in the /// pending set are marked for cancellation, but `ReadyCache::poll_pending` /// must be called to cause the service to be dropped. - pub fn evict>(&mut self, key: &Q) -> bool { + pub fn evict(&mut self, key: &Q) -> bool + where + Q: Hash + Equivalent + std::fmt::Debug, + { let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) { + debug!(?key, "Evicting pending service"); c.send(()).expect("cancel receiver lost"); true } else { @@ -177,15 +181,18 @@ where }; self.ready - .swap_remove_full(key) - .map(|_| true) + .swap_remove(key) + .map(|_| { + debug!(?key, "Evicted ready service"); + true + }) .unwrap_or(canceled) } } impl ReadyCache where - K: Clone + Eq + Hash, + K: Clone + Eq + Hash + std::fmt::Debug, S: Service, >::Error: Into, S::Error: Into, @@ -199,15 +206,19 @@ where /// the pending set; OR, when the new service becomes ready, it will replace /// the prior service in the ready set. pub fn push(&mut self, key: K, svc: S) { + debug!(?key, "Adding service"); let cancel = oneshot::channel(); self.push_pending(key, svc, cancel); } fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) { if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) { + debug!(?key, "Canceling pending service for replacement"); // If there is already a service for this key, cancel it. c.send(()).expect("cancel receiver lost"); } + + trace!(?key, "Adding pending service"); self.pending.push(Pending { key: Some(key), cancel: Some(cancel_rx), @@ -232,12 +243,10 @@ where Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(Ok(())), Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => { - trace!("endpoint ready"); + trace!(?key, "Endpoint became ready"); let cancel_tx = self.pending_cancel_txs.swap_remove(&key); - if let Some(cancel_tx) = cancel_tx { - // Keep track of the cancelation so that it need not be - // recreated after the service is used. - self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); + let (cancel_tx, cancel_rx) = if let Some(cancel_tx) = cancel_tx { + (cancel_tx, cancel_rx) } else { // This should not technically be possible. We must have decided to cancel // a Service (by sending on the CancelTx), yet that same service then @@ -249,27 +258,29 @@ where // So, we instead detect the endpoint as canceled at this point. That // should be fine, since the oneshot is only really there to ensure that // the Pending is polled again anyway. - // + warn!(?key, "Ready service had no associated cancelation"); // We assert that this can't happen in debug mode so that hopefully one day // we can find a test that triggers this reliably. debug_assert!(cancel_tx.is_some()); - debug!("canceled endpoint removed when ready"); - } + oneshot::channel() + }; + // Keep track of the cancelation so that it need not be recreated after the + // service is used. + self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); } - Poll::Ready(Some(Err(PendingError::Canceled(_)))) => { - debug!("endpoint canceled"); + Poll::Ready(Some(Err(PendingError::Canceled(key)))) => { + debug!(?key, "Endpoint canceled"); // The cancellation for this service was removed in order to // cause this cancellation. } Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => { let cancel_tx = self.pending_cancel_txs.swap_remove(&key); - if let Some(_) = cancel_tx { - return Err(error::Failed(key, e.into())).into(); - } else { + if cancel_tx.is_none() { // See comment for the same clause under Ready(Some(Ok)). + warn!(?key, "Failed service had no associated cancelation"); debug_assert!(cancel_tx.is_some()); - debug!("canceled endpoint removed on error"); } + return Err(error::Failed(key, e.into())).into(); } } } @@ -313,11 +324,14 @@ where .ready .swap_remove_index(index) .expect("invalid ready index"); + debug!(?key, "Ready service became pending"); // If a new version of this service has been added to the // unready set, don't overwrite it. if !self.pending_contains(&key) { self.push_pending(key, svc, cancel); + } else { + debug!(?key, "Service already has a pending replacement"); } Ok(false) @@ -356,6 +370,7 @@ where .ready .swap_remove_index(index) .expect("check_ready_index was not called"); + debug!(?key, "Issuing request to ready service"); let fut = svc.call(req); @@ -363,6 +378,8 @@ where // unready set, don't overwrite it. if !self.pending_contains(&key) { self.push_pending(key, svc, cancel); + } else { + debug!(?key, "Service already has a pending replacement"); } fut @@ -376,6 +393,7 @@ impl Unpin for Pending {} impl Future for Pending where + K: std::fmt::Debug, S: Service, { type Output = Result<(K, S, CancelRx), PendingError>; @@ -383,8 +401,19 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut fut = self.cancel.as_mut().expect("polled after complete"); if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) { - assert!(r.is_ok(), "cancel sender lost"); let key = self.key.take().expect("polled after complete"); + if r.is_err() { + // XXX Similar to the scenario described in `ReadyCache::poll_pending`, it should + // not be possible for this future to be polled when the cancelation has been + // dropped (without it being notified). But, this scenario appears to occur in the + // wild. + // + // This isn't technically a problem, though--there's no reason we can't continue to + // fail this future as if it were canceled. It's just deeply weird. So, for now, we + // log a warning and panic in debug mode. + warn!(?key, "Pending service lost its cancelation"); + debug_assert!(r.is_ok(), "cancel sender lost"); + } return Err(PendingError::Canceled(key)).into(); }