Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: Add TokioContext future (#2791) #2958

Merged
merged 2 commits into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions tokio-macros/src/entry.rs
@@ -1,3 +1,5 @@
#![allow(clippy::unnecessary_lazy_evaluations)]

use proc_macro::TokenStream;
use quote::quote;
use std::num::NonZeroUsize;
Expand Down
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Expand Up @@ -29,6 +29,7 @@ full = ["codec", "udp", "compat"]
compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
rt = ["tokio/rt-core"]

[dependencies]
tokio = { version = "0.2.5", path = "../tokio" }
Expand Down
10 changes: 10 additions & 0 deletions tokio-util/src/cfg.rs
Expand Up @@ -18,6 +18,16 @@ macro_rules! cfg_compat {
}
}

macro_rules! cfg_rt {
($($item:item)*) => {
$(
#[cfg(feature = "rt")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
$item
)*
}
}

macro_rules! cfg_udp {
($($item:item)*) => {
$(
Expand Down
78 changes: 78 additions & 0 deletions tokio-util/src/context.rs
@@ -0,0 +1,78 @@
//! Tokio context aware futures utilities.
//!
//! This module includes utilities around integrating tokio with other runtimes
//! by allowing the context to be attached to futures. This allows spawning
//! futures on other executors while still using tokio to drive them. This
//! can be useful if you need to use a tokio based library in an executor/runtime
//! that does not provide a tokio context.

use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;

pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
#[pin]
inner: F,
handle: Handle,
}
}

impl<F: Future> Future for TokioContext<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let handle = me.handle;
let fut = me.inner;

handle.enter(|| fut.poll(cx))
}
}

/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
/// use tokio_util::context::HandleExt;
/// use tokio::time::{delay_for, Duration};
///
/// let mut rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
///
/// let rt2 = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .build().unwrap();
///
/// let fut = delay_for(Duration::from_millis(2));
///
/// rt.block_on(
/// rt2
/// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
}

impl HandleExt for Handle {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
TokioContext {
inner: fut,
handle: self.clone(),
}
}
}
4 changes: 4 additions & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -35,3 +35,7 @@ cfg_udp! {
cfg_compat! {
pub mod compat;
}

cfg_rt! {
pub mod context;
}
29 changes: 29 additions & 0 deletions tokio-util/tests/context.rs
@@ -0,0 +1,29 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "rt")]

use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;

#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
.build()
.unwrap();
let rt2 = Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.unwrap();

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
}
4 changes: 3 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -2,7 +2,9 @@
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
clippy::needless_doctest_main
clippy::needless_doctest_main,
clippy::match_like_matches_macro,
clippy::stable_sort_primitive
)]
#![warn(
missing_debug_implementations,
Expand Down
3 changes: 2 additions & 1 deletion tokio/tests/macros_select.rs
@@ -1,4 +1,5 @@
#![allow(clippy::blacklisted_name)]
#![allow(clippy::blacklisted_name, clippy::stable_sort_primitive)]

use tokio::sync::{mpsc, oneshot};
use tokio::task;
use tokio_test::{assert_ok, assert_pending, assert_ready};
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/rt_common.rs
@@ -1,4 +1,4 @@
#![allow(clippy::needless_range_loop)]
#![allow(clippy::needless_range_loop, clippy::stable_sort_primitive)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

Expand Down
10 changes: 6 additions & 4 deletions tokio/tests/stream_stream_map.rs
@@ -1,3 +1,5 @@
#![allow(clippy::stable_sort_primitive)]

use tokio::stream::{self, pending, Stream, StreamExt, StreamMap};
use tokio::sync::mpsc;
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
Expand Down Expand Up @@ -213,17 +215,17 @@ fn new_capacity_zero() {
let map = StreamMap::<&str, stream::Pending<()>>::new();
assert_eq!(0, map.capacity());

let keys = map.keys().collect::<Vec<_>>();
assert!(keys.is_empty());
let mut keys = map.keys();
assert!(keys.next().is_none());
}

#[test]
fn with_capacity() {
let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
assert!(10 <= map.capacity());

let keys = map.keys().collect::<Vec<_>>();
assert!(keys.is_empty());
let mut keys = map.keys();
assert!(keys.next().is_none());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/sync_broadcast.rs
@@ -1,4 +1,4 @@
#![allow(clippy::cognitive_complexity)]
#![allow(clippy::cognitive_complexity, clippy::match_like_matches_macro)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]

Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/time_delay_queue.rs
@@ -1,4 +1,4 @@
#![allow(clippy::blacklisted_name)]
#![allow(clippy::blacklisted_name, clippy::stable_sort_primitive)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

Expand Down