Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DirectService from tower-{balance,buffer} #159

Merged
merged 2 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion tower-balance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ log = "0.4.1"
rand = "0.6"
tokio-timer = "0.2.4"
tower-service = { version = "0.2", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tower-discover = { version = "0.1", path = "../tower-discover" }
indexmap = "1"

Expand Down
157 changes: 35 additions & 122 deletions tower-balance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ extern crate indexmap;
extern crate quickcheck;
extern crate rand;
extern crate tokio_timer;
extern crate tower_direct_service;
extern crate tower_discover;
extern crate tower_service;

Expand All @@ -16,7 +15,6 @@ use indexmap::IndexMap;
use rand::{rngs::SmallRng, SeedableRng};
use std::marker::PhantomData;
use std::{error, fmt};
use tower_direct_service::DirectService;
use tower_discover::Discover;
use tower_service::Service;

Expand Down Expand Up @@ -187,9 +185,11 @@ where
///
/// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted
/// into `ready`, potentially altering the order of `ready` and/or `not_ready`.
fn promote_to_ready<F, E>(&mut self, mut poll_ready: F) -> Result<(), Error<E, D::Error>>
fn promote_to_ready<Request>(
&mut self,
) -> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
D::Service: Service<Request>,
{
let n = self.not_ready.len();
if n == 0 {
Expand All @@ -206,7 +206,7 @@ where
.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
poll_ready(svc).map_err(Error::Inner)?.is_ready()
svc.poll_ready().map_err(Error::Inner)?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
Expand All @@ -230,17 +230,16 @@ where
///
/// If the service exists in `ready` and does not poll as ready, it is moved to
/// `not_ready`, potentially altering the order of `ready` and/or `not_ready`.
fn poll_ready_index<F, E>(
fn poll_ready_index<Request>(
&mut self,
idx: usize,
mut poll_ready: F,
) -> Option<Poll<(), Error<E, D::Error>>>
) -> Option<Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
D::Service: Service<Request>,
{
match self.ready.get_index_mut(idx) {
None => return None,
Some((_, svc)) => match poll_ready(svc) {
Some((_, svc)) => match svc.poll_ready() {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Ok(Async::NotReady) => {}
Expand All @@ -258,9 +257,11 @@ where
/// Chooses the next service to which a request will be dispatched.
///
/// Ensures that .
fn choose_and_poll_ready<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
fn choose_and_poll_ready<Request>(
&mut self,
) -> Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
D::Service: Service<Request>,
{
loop {
let n = self.ready.len();
Expand All @@ -276,7 +277,7 @@ where

// XXX Should we handle per-endpoint errors?
if self
.poll_ready_index(idx, &mut poll_ready)
.poll_ready_index(idx)
.expect("invalid ready index")?
.is_ready()
{
Expand All @@ -285,143 +286,55 @@ where
}
}
}
}

fn poll_ready_inner<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
impl<D, C, Request> Service<Request> for Balance<D, C>
where
D: Discover,
D::Service: Service<Request>,
C: Choose<D::Key, D::Service>,
{
type Response = <D::Service as Service<Request>>::Response;
type Error = Error<<D::Service as Service<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as Service<Request>>::Future, D::Error>;

/// Prepares the balancer to process a request.
///
/// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// Clear before `ready` is altered.
self.chosen_ready_index = None;

// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx, &mut poll_ready)
self.poll_ready_index(idx)
.expect("invalid dispatched ready key")?;
}

// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready(&mut poll_ready)?;
self.promote_to_ready()?;

// Choose the next service to be used by `call`.
self.choose_and_poll_ready(&mut poll_ready)
self.choose_and_poll_ready()
}

fn call<Request, F, FF>(&mut self, call: F, request: Request) -> ResponseFuture<FF, D::Error>
where
F: FnOnce(&mut D::Service, Request) -> FF,
FF: Future,
{
fn call(&mut self, request: Request) -> Self::Future {
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self
.ready
.get_index_mut(idx)
.expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);

let rsp = call(svc, request);
let rsp = svc.call(request);
ResponseFuture(rsp, PhantomData)
}
}

impl<D, C, Request> Service<Request> for Balance<D, C>
where
D: Discover,
D::Service: Service<Request>,
C: Choose<D::Key, D::Service>,
{
type Response = <D::Service as Service<Request>>::Response;
type Error = Error<<D::Service as Service<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as Service<Request>>::Future, D::Error>;

/// Prepares the balancer to process a request.
///
/// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.poll_ready_inner(D::Service::poll_ready)
}

fn call(&mut self, request: Request) -> Self::Future {
self.call(D::Service::call, request)
}
}

impl<D, C, Request> DirectService<Request> for Balance<D, C>
where
D: Discover,
D::Service: DirectService<Request>,
C: Choose<D::Key, D::Service>,
{
type Response = <D::Service as DirectService<Request>>::Response;
type Error = Error<<D::Service as DirectService<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as DirectService<Request>>::Future, D::Error>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.poll_ready_inner(D::Service::poll_ready)
}

fn call(&mut self, request: Request) -> Self::Future {
self.call(D::Service::call, request)
}

fn poll_service(&mut self) -> Poll<(), Self::Error> {
let mut any_not_ready = false;

// TODO: don't re-poll services that return Ready until call is invoked on them

for (_, svc) in &mut self.ready {
if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? {
any_not_ready = true;
}
}

for (_, svc) in &mut self.not_ready {
if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? {
any_not_ready = true;
}
}

if any_not_ready {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}

fn poll_close(&mut self) -> Poll<(), Self::Error> {
let mut err = None;
self.ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});
self.not_ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});

if let Some(e) = err {
return Err(Error::Inner(e));
}

if self.ready.is_empty() && self.not_ready.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}

// ===== impl ResponseFuture =====

impl<F: Future, E> Future for ResponseFuture<F, E> {
Expand Down
1 change: 0 additions & 1 deletion tower-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tokio-executor = "0.1"
lazycell = "1.2"
tokio-sync = "0.1"
Expand Down