forked from tokio-rs/tokio
/
context.rs
83 lines (77 loc) · 2.4 KB
/
context.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//! This module introduces `TokioContext`, which enables a simple way of using
//! the tokio runtime with any other executor.
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
pin_project! {
/// TokioContext allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
#[pin]
inner: F,
handle: Handle,
}
}
impl<F: Future> Future for TokioContext<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let handle = me.handle;
let fut = me.inner;
handle.enter(|| fut.poll(cx))
}
}
/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt: Into<Handle> + Clone {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// use std::futures::{self, Future};
/// use tokio-utils::context::HandleExt;
/// use tokio::runtime::{Builder, Handle, Runtime};
/// use tokio::time::{delay_for, Duration};
///
/// struct ThreadPool {
/// pub inner: futures::executor::ThreadPool,
/// pub rt: Runtime,
/// }
///
/// impl ThreadPool {
/// fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
/// let handle = self.rt.handle().clone();
/// // create a TokioContext from the handle and future.
/// let h = handle.wrap(f);
/// self.inner.spawn_ok(h);
/// }
/// }
///
/// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build.unwrap();
/// let inner = futures::executor::ThreadPool::builder().create().unwrap();
/// let executor: ThreadPool {
/// inner, rt
/// }
///
/// executor.spawn(async {
/// delay_for(Duration::from_millis(2)).await
/// });
fn wrap<F>(&self, fut: F) -> TokioContext<F>
where
F: Future,
{
TokioContext {
inner: fut,
handle: (*self).clone().into(),
}
}
}
impl HandleExt for Handle {}