/
dispatch.rs
123 lines (106 loc) · 3.88 KB
/
dispatch.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::error::{IdleError, ServiceError};
use crate::InFlight;
use futures::{prelude::*, select_biased};
use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::sync::Arc;
use tower::util::ServiceExt;
use tracing::trace;
pub(crate) async fn idle(max: std::time::Duration) -> IdleError {
tokio::time::delay_for(max).await;
IdleError(max)
}
pub(crate) async fn run<S, Req, I>(
mut service: S,
mut requests: mpsc::Receiver<InFlight<Req, S::Response>>,
idle: impl Fn() -> I,
) where
S: tower::Service<Req>,
S::Future: Send + 'static,
S::Error: Into<Error>,
I: std::future::Future,
I::Output: Into<Error>,
{
// Drive requests from the queue to the inner service.
loop {
select_biased! {
req = requests.recv().fuse() => {
match req {
None => return,
Some(InFlight { request, tx }) => {
match service.ready_and().await {
Ok(svc) => {
trace!("Dispatching request");
let _ = tx.send(Ok(Box::pin(svc.call(request).err_into::<Error>())));
}
Err(e) =>{
let error = ServiceError(Arc::new(e.into()));
trace!(%error, "Service failed");
// Fail this request.
let _ = tx.send(Err(error.clone().into()));
// Drain the queue and fail all remaining requests.
while let Some(InFlight { tx, .. }) = requests.recv().await {
let _ = tx.send(Err(error.clone().into()));
}
return;
}
};
}
}
}
e = idle().fuse() => {
let error = ServiceError(Arc::new(e.into()));
trace!(%error, "Idling out inner service");
break;
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::delay_for;
use tokio_test::{assert_pending, assert_ready, task};
use tower_test::mock;
#[tokio::test]
async fn idle_when_unused() {
let max_idle = Duration::from_millis(100);
let (tx, rx) = mpsc::channel(1);
let (inner, mut handle) = mock::pair::<(), ()>();
let mut dispatch = task::spawn(run(inner, rx, || idle(max_idle)));
handle.allow(1);
// Service ready without requests. Idle counter starts ticking.
assert_pending!(dispatch.poll());
delay_for(max_idle).await;
assert_ready!(dispatch.poll());
drop((tx, handle));
}
#[tokio::test]
async fn idle_reset_by_request() {
let max_idle = Duration::from_millis(100);
let (mut tx, rx) = mpsc::channel(1);
let (inner, mut handle) = mock::pair::<(), ()>();
let mut dispatch = task::spawn(run(inner, rx, || idle(max_idle)));
handle.allow(1);
// Service ready without requests. Idle counter starts ticking.
assert_pending!(dispatch.poll());
delay_for(max_idle).await;
// Send a request after the deadline has fired but before the
// dispatch future is polled. Ensure that the request is admitted,
// resetting idleness.
tx.send({
let (tx, _rx) = oneshot::channel();
super::InFlight { request: (), tx }
})
.await
.expect("request not sent");
assert_pending!(dispatch.poll());
handle.allow(1);
assert_pending!(dispatch.poll());
delay_for(max_idle).await;
assert_ready!(dispatch.poll());
drop((tx, handle));
}
}