Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: Add TokioContext future #2791

Merged
merged 18 commits into from Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion tokio-util/Cargo.toml
Expand Up @@ -42,10 +42,11 @@ log = "0.4"
pin-project-lite = "0.1.4"

[dev-dependencies]
lazy_static = "1.4.0"
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
tokio-test = { version = "0.3.0", path = "../tokio-test" }

futures = "0.3.0"
futures = { version = "0.3.0", features = ["thread-pool"] }
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
futures-test = "0.3.5"

[package.metadata.docs.rs]
Expand Down
38 changes: 38 additions & 0 deletions tokio-util/src/context.rs
@@ -0,0 +1,38 @@
//! This module introduces `TokioContext`, which enables a simple way of using
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
//! the tokio runtime with any other executor.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;

pin_project! {
/// TODO: add docs. Get some inspiration?
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
pub struct TokioContext<F> {
#[pin]
inner: F,
handle: Handle,
}
}

impl<F: Future> Future for TokioContext<F> {
type Output = F::Output;
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let handle = me.handle;
let fut = me.inner;

handle.enter(|| fut.poll(cx))
}
}
blasrodri marked this conversation as resolved.
Show resolved Hide resolved

blasrodri marked this conversation as resolved.
Show resolved Hide resolved
impl<F: Future> TokioContext<F> {
/// Creates a new `TokioContext`. Expects a future as its first argument, and a
/// `tokio::runtime::Handle` as a second.
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(f: F, handle: Handle) -> Self {
Self { inner: f, handle }
}
}
2 changes: 2 additions & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -38,4 +38,6 @@ cfg_compat! {
pub mod compat;
}

pub mod context;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw do you think we should feature flag this around rt?


pub mod sync;
49 changes: 49 additions & 0 deletions tokio-util/tests/context.rs
@@ -0,0 +1,49 @@
#![warn(rust_2018_idioms)]

use tokio::{net::TcpListener, sync::oneshot};
use tokio_util::context::TokioContext;

use lazy_static::lazy_static;
use std::future::Future;

struct ThreadPool {
inner: futures::executor::ThreadPool,
rt: tokio::runtime::Runtime,
}

lazy_static! {
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
static ref EXECUTOR: ThreadPool = {
// Spawn tokio runtime on a single background thread
// enabling IO and timers.
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.core_threads(1)
.build()
.unwrap();

let inner = futures::executor::ThreadPool::builder().create().unwrap();

ThreadPool { inner, rt }
};
}

impl ThreadPool {
fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
let handle = self.rt.handle().clone();
self.inner.spawn_ok(TokioContext::new(f, handle));
}
}

#[test]
fn tokio_context_with_another_runtime() {
let (tx, rx) = oneshot::channel();

EXECUTOR.spawn(async move {
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
println!("addr: {:?}", listener.local_addr());
tx.send(()).unwrap();
});

futures::executor::block_on(rx).unwrap();
}