diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index 690e2723d..26212b1ac 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -117,7 +117,7 @@ where impl Balance where D: Discover + Unpin, - D::Key: Hash + Clone, + D::Key: Hash + Clone + std::fmt::Debug, D::Error: Into, D::Service: Service + Load, ::Metric: std::fmt::Debug, @@ -223,7 +223,7 @@ where impl Service for Balance where D: Discover + Unpin, - D::Key: Hash + Clone, + D::Key: Hash + Clone + std::fmt::Debug, D::Error: Into, D::Service: Service + Load, ::Metric: std::fmt::Debug, diff --git a/tower/src/ready_cache/cache.rs b/tower/src/ready_cache/cache.rs index 6cdaca85f..546234afb 100644 --- a/tower/src/ready_cache/cache.rs +++ b/tower/src/ready_cache/cache.rs @@ -11,7 +11,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::oneshot; use tower_service::Service; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; /// Drives readiness over a set of services. /// @@ -173,8 +173,12 @@ where /// Services are dropped from the ready set immediately. Services in the /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`] /// must be called to cause the service to be dropped. - pub fn evict>(&mut self, key: &Q) -> bool { + pub fn evict(&mut self, key: &Q) -> bool + where + Q: Hash + Equivalent + std::fmt::Debug, + { let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) { + debug!(?key, "Evicting pending service"); c.send(()).expect("cancel receiver lost"); true } else { @@ -182,15 +186,18 @@ where }; self.ready - .swap_remove_full(key) - .map(|_| true) + .swap_remove(key) + .map(|_| { + debug!(?key, "Evicted ready service"); + true + }) .unwrap_or(canceled) } } impl ReadyCache where - K: Clone + Eq + Hash, + K: Clone + Eq + Hash + std::fmt::Debug, S: Service, >::Error: Into, S::Error: Into, @@ -206,15 +213,19 @@ where /// /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending pub fn push(&mut self, key: K, svc: S) { + debug!(?key, "Adding service"); let cancel = oneshot::channel(); self.push_pending(key, svc, cancel); } fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) { if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) { + debug!(?key, "Canceling pending service for replacement"); // If there is already a service for this key, cancel it. c.send(()).expect("cancel receiver lost"); } + + trace!(?key, "Adding pending service"); self.pending.push(Pending { key: Some(key), cancel: Some(cancel_rx), @@ -243,12 +254,10 @@ where Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(Ok(())), Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => { - trace!("endpoint ready"); + trace!(?key, "Endpoint became ready"); let cancel_tx = self.pending_cancel_txs.swap_remove(&key); - if let Some(cancel_tx) = cancel_tx { - // Keep track of the cancelation so that it need not be - // recreated after the service is used. - self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); + let (cancel_tx, cancel_rx) = if let Some(cancel_tx) = cancel_tx { + (cancel_tx, cancel_rx) } else { // This should not technically be possible. We must have decided to cancel // a Service (by sending on the CancelTx), yet that same service then @@ -260,27 +269,29 @@ where // So, we instead detect the endpoint as canceled at this point. That // should be fine, since the oneshot is only really there to ensure that // the Pending is polled again anyway. - // + warn!(?key, "Ready service had no associated cancelation"); // We assert that this can't happen in debug mode so that hopefully one day // we can find a test that triggers this reliably. debug_assert!(cancel_tx.is_some()); - debug!("canceled endpoint removed when ready"); - } + oneshot::channel() + }; + // Keep track of the cancelation so that it need not be recreated after the + // service is used. + self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); } - Poll::Ready(Some(Err(PendingError::Canceled(_)))) => { - debug!("endpoint canceled"); + Poll::Ready(Some(Err(PendingError::Canceled(key)))) => { + debug!(?key, "Endpoint canceled"); // The cancellation for this service was removed in order to // cause this cancellation. } Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => { let cancel_tx = self.pending_cancel_txs.swap_remove(&key); - if cancel_tx.is_some() { - return Err(error::Failed(key, e.into())).into(); - } else { + if cancel_tx.is_none() { // See comment for the same clause under Ready(Some(Ok)). + warn!(?key, "Failed service had no associated cancelation"); debug_assert!(cancel_tx.is_some()); - debug!("canceled endpoint removed on error"); } + return Err(error::Failed(key, e.into())).into(); } } } @@ -324,11 +335,14 @@ where .ready .swap_remove_index(index) .expect("invalid ready index"); + debug!(?key, "Ready service became pending"); // If a new version of this service has been added to the // unready set, don't overwrite it. if !self.pending_contains(&key) { self.push_pending(key, svc, cancel); + } else { + debug!(?key, "Service already has a pending replacement"); } Ok(false) @@ -367,6 +381,7 @@ where .ready .swap_remove_index(index) .expect("check_ready_index was not called"); + debug!(?key, "Issuing request to ready service"); let fut = svc.call(req); @@ -374,6 +389,8 @@ where // unready set, don't overwrite it. if !self.pending_contains(&key) { self.push_pending(key, svc, cancel); + } else { + debug!(?key, "Service already has a pending replacement"); } fut @@ -387,6 +404,7 @@ impl Unpin for Pending {} impl Future for Pending where + K: std::fmt::Debug, S: Service, { type Output = Result<(K, S, CancelRx), PendingError>; @@ -394,8 +412,19 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut fut = self.cancel.as_mut().expect("polled after complete"); if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) { - assert!(r.is_ok(), "cancel sender lost"); let key = self.key.take().expect("polled after complete"); + if r.is_err() { + // XXX Similar to the scenario described in `ReadyCache::poll_pending`, it should + // not be possible for this future to be polled when the cancelation has been + // dropped (without it being notified). But, this scenario appears to occur in the + // wild. + // + // This isn't technically a problem, though--there's no reason we can't continue to + // fail this future as if it were canceled. It's just deeply weird. So, for now, we + // log a warning and panic in debug mode. + warn!(?key, "Pending service lost its cancelation"); + debug_assert!(r.is_ok(), "cancel sender lost"); + } return Err(PendingError::Canceled(key)).into(); }