-
Notifications
You must be signed in to change notification settings - Fork 264
/
service.rs
77 lines (71 loc) · 2.33 KB
/
service.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::Metrics;
use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::time::Instant;
#[derive(Debug)]
pub struct TrackService<S> {
inner: S,
metrics: Arc<Metrics>,
blocked_since: Option<Instant>,
}
impl<S> TrackService<S> {
pub(crate) fn new(inner: S, metrics: Arc<Metrics>) -> Self {
Self {
inner,
metrics,
blocked_since: None,
}
}
}
impl<T, S> tower::Service<T> for TrackService<S>
where
S: tower::Service<T>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.inner.poll_ready(cx) {
Poll::Pending => {
self.metrics.not_ready_total.incr();
// If the service was already pending, then add the time we
// waited and reset blocked_since. This allows the value to be
// updated even when we're "stuck" in pending.
let now = Instant::now();
if let Some(t0) = self.blocked_since.take() {
let not_ready = now.saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
self.blocked_since = Some(now);
Poll::Pending
}
Poll::Ready(Ok(())) => {
self.metrics.ready_total.incr();
if let Some(t0) = self.blocked_since.take() {
let not_ready = Instant::now().saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
self.metrics.error_total.incr();
if let Some(t0) = self.blocked_since.take() {
let not_ready = Instant::now().saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
Poll::Ready(Err(e))
}
}
}
#[inline]
fn call(&mut self, target: T) -> Self::Future {
self.inner.call(target)
}
}
impl<S> Drop for TrackService<S> {
fn drop(&mut self) {
self.metrics.drop_total.incr();
}
}