Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ready_cache: just use pin_project for Pending #667

Merged
merged 3 commits into from Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tower/Cargo.toml
Expand Up @@ -56,7 +56,7 @@ limit = ["__common", "tokio/time", "tokio/sync", "tokio-util", "tracing"]
load = ["__common", "tokio/time", "tracing"]
load-shed = ["__common"]
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["__common", "tokio/time"]
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
Expand Down
38 changes: 19 additions & 19 deletions tower/src/ready_cache/cache.rs
Expand Up @@ -85,14 +85,16 @@ enum PendingError<K, E> {
Inner(K, E),
}

/// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: Option<S>,
_pd: std::marker::PhantomData<Req>,
pin_project_lite::pin_project! {
/// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: Option<S>,
_pd: std::marker::PhantomData<Req>,
}
}

// === ReadyCache ===
Expand Down Expand Up @@ -400,37 +402,35 @@ where

// === Pending ===

// Safety: No use unsafe access therefore this is safe.
impl<K, S, Req> Unpin for Pending<K, S, Req> {}

impl<K, S, Req> Future for Pending<K, S, Req>
where
S: Service<Req>,
{
type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut fut = self.cancel.as_mut().expect("polled after complete");
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut fut = this.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
let key = self.key.take().expect("polled after complete");
let key = this.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key)).into();
}

match self
match this
.ready
.as_mut()
.expect("polled after ready")
.poll_ready(cx)
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
let key = self.key.take().expect("polled after complete");
let cancel = self.cancel.take().expect("polled after complete");
Ok((key, self.ready.take().expect("polled after ready"), cancel)).into()
let key = this.key.take().expect("polled after complete");
let cancel = this.cancel.take().expect("polled after complete");
Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
}
Poll::Ready(Err(e)) => {
let key = self.key.take().expect("polled after compete");
let key = this.key.take().expect("polled after compete");
Err(PendingError::Inner(key, e)).into()
}
}
Expand Down