diff --git a/tower/src/ready_cache/cache.rs b/tower/src/ready_cache/cache.rs index 518a11edb..8070d7807 100644 --- a/tower/src/ready_cache/cache.rs +++ b/tower/src/ready_cache/cache.rs @@ -2,15 +2,16 @@ use super::error; use futures_core::Stream; -use futures_util::stream::FuturesUnordered; +use futures_util::{stream::FuturesUnordered, task::AtomicWaker}; pub use indexmap::Equivalent; use indexmap::IndexMap; use std::fmt; use std::future::Future; use std::hash::Hash; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::sync::oneshot; use tower_service::Service; use tracing::{debug, trace}; @@ -75,8 +76,18 @@ where // Safety: This is safe because we do not use `Pin::new_unchecked`. impl Unpin for ReadyCache {} -type CancelRx = oneshot::Receiver<()>; -type CancelTx = oneshot::Sender<()>; +#[derive(Debug)] +struct Cancel { + waker: AtomicWaker, + canceled: AtomicBool, +} + +#[derive(Debug)] +struct CancelRx(Arc); + +#[derive(Debug)] +struct CancelTx(Arc); + type CancelPair = (CancelTx, CancelRx); #[derive(Debug)] @@ -195,7 +206,7 @@ where /// must be called to cause the service to be dropped. pub fn evict>(&mut self, key: &Q) -> bool { let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) { - c.send(()).expect("cancel receiver lost"); + c.cancel(); true } else { false @@ -226,14 +237,14 @@ where /// /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending pub fn push(&mut self, key: K, svc: S) { - let cancel = oneshot::channel(); + let cancel = cancelable(); self.push_pending(key, svc, cancel); } fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) { if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) { // If there is already a service for this key, cancel it. - c.send(()).expect("cancel receiver lost"); + c.cancel(); } self.pending.push(Pending { key: Some(key), @@ -270,21 +281,10 @@ where // recreated after the service is used. self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); } else { - // This should not technically be possible. We must have decided to cancel - // a Service (by sending on the CancelTx), yet that same service then - // returns Ready. Since polling a Pending _first_ polls the CancelRx, that - // _should_ always see our CancelTx send. Yet empirically, that isn't true: - // - // https://github.com/tower-rs/tower/issues/415 - // - // So, we instead detect the endpoint as canceled at this point. That - // should be fine, since the oneshot is only really there to ensure that - // the Pending is polled again anyway. - // - // We assert that this can't happen in debug mode so that hopefully one day - // we can find a test that triggers this reliably. - debug_assert!(cancel_tx.is_some()); - debug!("canceled endpoint removed when ready"); + assert!( + cancel_tx.is_some(), + "services that become ready must have a pending cancelation" + ); } } Poll::Ready(Some(Err(PendingError::Canceled(_)))) => { @@ -294,13 +294,11 @@ where } Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => { let cancel_tx = self.pending_cancel_txs.swap_remove(&key); - if cancel_tx.is_some() { - return Err(error::Failed(key, e.into())).into(); - } else { - // See comment for the same clause under Ready(Some(Ok)). - debug_assert!(cancel_tx.is_some()); - debug!("canceled endpoint removed on error"); - } + assert!( + cancel_tx.is_some(), + "services that return an error must have a pending cancelation" + ); + return Err(error::Failed(key, e.into())).into(); } } } @@ -400,6 +398,28 @@ where } } +// === impl Cancel === + +/// Creates a cancelation sender and receiver. +/// +/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to +/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows +/// the state to be observed as soon as the cancelation is triggered. +fn cancelable() -> CancelPair { + let cx = Arc::new(Cancel { + waker: AtomicWaker::new(), + canceled: AtomicBool::new(false), + }); + (CancelTx(cx.clone()), CancelRx(cx)) +} + +impl CancelTx { + fn cancel(self) { + self.0.canceled.store(true, Ordering::SeqCst); + self.0.waker.wake(); + } +} + // === Pending === impl Future for Pending @@ -410,9 +430,10 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let mut fut = this.cancel.as_mut().expect("polled after complete"); - if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) { - assert!(r.is_ok(), "cancel sender lost"); + // Before checking whether the service is ready, check to see whether + // readiness has been canceled. + let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete"); + if cancel.canceled.load(Ordering::SeqCst) { let key = this.key.take().expect("polled after complete"); return Err(PendingError::Canceled(key)).into(); } @@ -423,7 +444,21 @@ where .expect("polled after ready") .poll_ready(cx) { - Poll::Pending => Poll::Pending, + Poll::Pending => { + // Before returning Pending, register interest in cancelation so + // that this future is polled again if the state changes. + let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete"); + cancel.waker.register(cx.waker()); + // Because both the cancel receiver and cancel sender are held + // by the `ReadyCache` (i.e., on a single task), then it must + // not be possible for the cancelation state to change while + // polling a `Pending` service. + assert!( + !cancel.canceled.load(Ordering::SeqCst), + "cancelation cannot be notified while polling a pending service" + ); + Poll::Pending + } Poll::Ready(Ok(())) => { let key = this.key.take().expect("polled after complete"); let cancel = this.cancel.take().expect("polled after complete"); diff --git a/tower/tests/ready_cache/main.rs b/tower/tests/ready_cache/main.rs index f7acbf81c..56be6a9cb 100644 --- a/tower/tests/ready_cache/main.rs +++ b/tower/tests/ready_cache/main.rs @@ -2,8 +2,9 @@ #[path = "../support.rs"] mod support; +use std::pin::Pin; use tokio_test::{assert_pending, assert_ready, task}; -use tower::ready_cache::ReadyCache; +use tower::ready_cache::{error, ReadyCache}; use tower_test::mock; type Req = &'static str; @@ -191,3 +192,32 @@ fn duplicate_key_by_index() { // _and_ service 0 should now be callable assert!(task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap()); } + +// Tests https://github.com/tower-rs/tower/issues/415 +#[tokio::test(flavor = "current_thread")] +async fn cancelation_observed() { + let mut cache = ReadyCache::default(); + let mut handles = vec![]; + + // NOTE This test passes at 129 items, but fails at 130 items (if coop + // schedulding interferes with cancelation). + for _ in 0..130 { + let (svc, mut handle) = tower_test::mock::pair::<(), ()>(); + handle.allow(1); + cache.push("ep0", svc); + handles.push(handle); + } + + struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>); + impl Unpin for Ready {} + impl std::future::Future for Ready { + type Output = Result<(), error::Failed<&'static str>>; + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.get_mut().0.poll_pending(cx) + } + } + Ready(cache).await.unwrap(); +}