forked from tokio-rs/tokio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
count.rs
119 lines (91 loc) · 3.14 KB
/
count.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use proc_macro::TokenStream;
use proc_macro::{Punct, TokenTree};
use quote::quote;
use syn::parse::{Parse, ParseStream};
use syn::{Expr, Token};
pub(crate) fn count(input: TokenStream) -> TokenStream {
let count: usize = input.into_iter().count();
TokenStream::from(quote!(#count))
}
struct Join {
fut_exprs: Vec<Expr>,
}
impl Parse for Join {
fn parse(input: ParseStream<'_>) -> syn::Result<Self> {
let mut exprs = Vec::new();
while !input.is_empty() {
exprs.push(input.parse::<Expr>()?);
if !input.is_empty() {
input.parse::<Token![,]>()?;
}
}
Ok(Join { fut_exprs: exprs })
}
}
pub(crate) fn test(input: TokenStream) -> TokenStream {
let parsed = syn::parse_macro_input!(input as Join);
let futures_count = parsed.fut_exprs.len();
// let futures = parsed.fut_exprs.into_iter().map(|fut_expr| {
// quote! {
// maybe_done(#fut_expr)
// }
// });
let futures = parsed.fut_exprs.into_iter().map(|fut_epxr| fut_epxr);
let if_statements = (0..futures_count).map(|i| {
quote! {
if #i == turn {
let fut = &mut futures.#i;
// Safety: future is stored on the stack above
// and never moved.
let mut fut = unsafe { Pin::new_unchecked(fut) };
// Try polling
if fut.poll(cx).is_pending() {
is_pending = true;
}
continue;
}
}
});
let ready_output = (0..futures_count).map(|i| {
quote! {
let fut = &mut futures.#i;
// Safety: future is stored on the stack above
// and never moved.
let mut fut = unsafe { Pin::new_unchecked(fut) };
fut.take_output().expect("expected completed future")
}
});
TokenStream::from(quote! {{
use tokio::macros::support::{maybe_done, poll_fn, Future, Pin};
use tokio::macros::support::Poll::{Ready, Pending};
// Safety: nothing must be moved out of `futures`. This is to satisfy
// the requirement of `Pin::new_unchecked` called below.
// let mut futures = #futures;
let mut futures = ( #( maybe_done(#futures) ),* );
// How many futures were passed to join!.
const FUTURE_COUNT: u32 = #futures_count;
// When poll_fn is polled, start polling the future at this index.
let mut start_index = 0;
poll_fn(move |cx| {
let mut is_pending = false;
for i in 0..FUTURE_COUNT {
let turn;
#[allow(clippy::modulo_one)]
{
turn = (start_index + i) % FUTURE_COUNT
};
#( #if_statements )*
}
if is_pending {
// Start by polling the next future first the next time poll_fn is polled
#[allow(clippy::modulo_one)]
{
start_index = (start_index + 1) % FUTURE_COUNT;
}
Pending
} else {
// Ready( #( #ready_output )* )
}
}).await
}})
}