-
-
Notifications
You must be signed in to change notification settings - Fork 709
/
and.rs
87 lines (77 loc) · 2.78 KB
/
and.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;
use futures::ready;
use pin_project::{pin_project, project};
use super::{Combine, Filter, FilterBase, HList, Tuple};
use crate::reject::CombineRejection;
#[derive(Clone, Copy, Debug)]
pub struct And<T, U> {
pub(super) first: T,
pub(super) second: U,
}
impl<T, U> FilterBase for And<T, U>
where
T: Filter,
T::Extract: Send,
U: Filter + Clone + Send,
<T::Extract as Tuple>::HList: Combine<<U::Extract as Tuple>::HList> + Send,
<<<T::Extract as Tuple>::HList as Combine<<U::Extract as Tuple>::HList>>::Output as HList>::Tuple: Send,
U::Error: CombineRejection<T::Error>,
{
type Extract = <<<T::Extract as Tuple>::HList as Combine<<U::Extract as Tuple>::HList>>::Output as HList>::Tuple;
type Error = <U::Error as CombineRejection<T::Error>>::One;
type Future = AndFuture<T, U>;
fn filter(&self) -> Self::Future {
AndFuture {
state: State::First(self.first.filter(), self.second.clone()),
}
}
}
#[allow(missing_debug_implementations)]
#[pin_project]
pub struct AndFuture<T: Filter, U: Filter> {
#[pin]
state: State<T, U>,
}
#[pin_project]
enum State<T: Filter, U: Filter> {
First(#[pin] T::Future, U),
Second(Option<T::Extract>, #[pin] U::Future),
Done,
}
impl<T, U> Future for AndFuture<T, U>
where
T: Filter,
U: Filter,
<T::Extract as Tuple>::HList: Combine<<U::Extract as Tuple>::HList> + Send,
U::Error: CombineRejection<T::Error>,
{
type Output = Result<
<<<T::Extract as Tuple>::HList as Combine<<U::Extract as Tuple>::HList>>::Output as HList>::Tuple,
<U::Error as CombineRejection<T::Error>>::One>;
#[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
let pin = self.as_mut().project();
#[project]
let (ex1, fut2) = match pin.state.project() {
State::First(first, second) => match ready!(first.poll(cx)) {
Ok(first) => (first, second.filter()),
Err(err) => return Poll::Ready(Err(From::from(err)))
}
State::Second(ex1, second) => {
let ex2 = match ready!(second.poll(cx)) {
Ok(second) => second,
Err(err) => return Poll::Ready(Err(From::from(err)))
};
let ex3 = ex1.take().unwrap().hlist().combine(ex2.hlist()).flatten();
self.set(AndFuture{ state: State::Done });
return Poll::Ready(Ok(ex3));
}
State::Done => panic!("polled after complete"),
};
self.set(AndFuture{ state: State::Second(Some(ex1), fut2) });
}
}
}