Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into mental/add-mutex-…
Browse files Browse the repository at this point in the history
…const-constructors
  • Loading branch information
mental32 committed Sep 9, 2020
2 parents a1fa12a + 1550dda commit 3fc62b2
Show file tree
Hide file tree
Showing 38 changed files with 891 additions and 284 deletions.
25 changes: 24 additions & 1 deletion tests-integration/tests/process_stdio.rs
Expand Up @@ -72,7 +72,7 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
};

// Compose reading and writing concurrently.
future::join3(write, read, cat)
future::join3(write, read, cat.wait())
.map(|(_, _, status)| status)
.await
}
Expand Down Expand Up @@ -125,3 +125,26 @@ async fn status_closes_any_pipes() {

assert_ok!(child.await);
}

#[tokio::test]
async fn try_wait() {
let mut child = cat().spawn().unwrap();

let id = child.id().expect("missing id");
assert!(id > 0);

assert_eq!(None, assert_ok!(child.try_wait()));

// Drop the child's stdio handles so it can terminate
drop(child.stdin.take());
drop(child.stderr.take());
drop(child.stdout.take());

assert_ok!(child.wait().await);

// test that the `.try_wait()` method is fused just like the stdlib
assert!(assert_ok!(child.try_wait()).unwrap().success());

// Can't get id after process has exited
assert_eq!(child.id(), None);
}
14 changes: 12 additions & 2 deletions tokio-macros/src/lib.rs
Expand Up @@ -26,7 +26,12 @@ use proc_macro::TokenStream;

/// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime`
/// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or
/// [Builder](../tokio/runtime/struct.builder.html) directly.
/// [Builder](../tokio/runtime/struct.Builder.html) directly.
///
/// Note: This macro is designed to be simplistic and targets applications that do not require
/// a complex setup. If provided functionality is not sufficient, user may be interested in
/// using [Builder](../tokio/runtime/struct.Builder.html), which provides a more powerful
/// interface.
///
/// ## Options:
///
Expand Down Expand Up @@ -133,7 +138,12 @@ pub fn main_threaded(args: TokenStream, item: TokenStream) -> TokenStream {

/// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime`
/// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or
/// [Builder](../tokio/runtime/struct.builder.html) directly.
/// [Builder](../tokio/runtime/struct.Builder.html) directly.
///
/// Note: This macro is designed to be simplistic and targets applications that do not require
/// a complex setup. If provided functionality is not sufficient, user may be interested in
/// using [Builder](../tokio/runtime/struct.Builder.html), which provides a more powerful
/// interface.
///
/// ## Options:
///
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/Cargo.toml
Expand Up @@ -25,11 +25,12 @@ publish = false
default = []

# Shorthand for enabling everything
full = ["codec", "udp", "compat"]
full = ["codec", "udp", "compat", "io"]

compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
io = []

[dependencies]
tokio = { version = "0.3.0", path = "../tokio" }
Expand Down
10 changes: 10 additions & 0 deletions tokio-util/src/cfg.rs
Expand Up @@ -27,3 +27,13 @@ macro_rules! cfg_udp {
)*
}
}

macro_rules! cfg_io {
($($item:item)*) => {
$(
#[cfg(feature = "io")]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
$item
)*
}
}
194 changes: 194 additions & 0 deletions tokio-util/src/either.rs
@@ -0,0 +1,194 @@
//! Module defining an Either type.
use std::{
future::Future,
io::SeekFrom,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf, Result};

/// Combines two different futures, streams, or sinks having the same associated types into a single type.
///
/// This type implements common asynchronous traits such as [`Future`] and those in Tokio.
///
/// [`Future`]: std::future::Future
///
/// # Example
///
/// The following code will not work:
///
/// ```compile_fail
/// # fn some_condition() -> bool { true }
/// # async fn some_async_function() -> u32 { 10 }
/// # async fn other_async_function() -> u32 { 20 }
/// #[tokio::main]
/// async fn main() {
/// let result = if some_condition() {
/// some_async_function()
/// } else {
/// other_async_function() // <- Will print: "`if` and `else` have incompatible types"
/// };
///
/// println!("Result is {}", result.await);
/// }
/// ```
///
// This is because although the output types for both futures is the same, the exact future
// types are different, but the compiler must be able to choose a single type for the
// `result` variable.
///
/// When the output type is the same, we can wrap each future in `Either` to avoid the
/// issue:
///
/// ```
/// use tokio_util::either::Either;
/// # fn some_condition() -> bool { true }
/// # async fn some_async_function() -> u32 { 10 }
/// # async fn other_async_function() -> u32 { 20 }
///
/// #[tokio::main]
/// async fn main() {
/// let result = if some_condition() {
/// Either::Left(some_async_function())
/// } else {
/// Either::Right(other_async_function())
/// };
///
/// let value = result.await;
/// println!("Result is {}", value);
/// # assert_eq!(value, 10);
/// }
/// ```
#[allow(missing_docs)] // Doc-comments for variants in this particular case don't make much sense.
#[derive(Debug, Clone)]
pub enum Either<L, R> {
Left(L),
Right(R),
}

/// A small helper macro which reduces amount of boilerplate in the actual trait method implementation.
/// It takes an invokation of method as an argument (e.g. `self.poll(cx)`), and redirects it to either
/// enum variant held in `self`.
macro_rules! delegate_call {
($self:ident.$method:ident($($args:ident),+)) => {
unsafe {
match $self.get_unchecked_mut() {
Self::Left(l) => Pin::new_unchecked(l).$method($($args),+),
Self::Right(r) => Pin::new_unchecked(r).$method($($args),+),
}
}
}
}

impl<L, R, O> Future for Either<L, R>
where
L: Future<Output = O>,
R: Future<Output = O>,
{
type Output = O;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
delegate_call!(self.poll(cx))
}
}

impl<L, R> AsyncRead for Either<L, R>
where
L: AsyncRead,
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
delegate_call!(self.poll_read(cx, buf))
}
}

impl<L, R> AsyncBufRead for Either<L, R>
where
L: AsyncBufRead,
R: AsyncBufRead,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
delegate_call!(self.poll_fill_buf(cx))
}

fn consume(self: Pin<&mut Self>, amt: usize) {
delegate_call!(self.consume(amt))
}
}

impl<L, R> AsyncSeek for Either<L, R>
where
L: AsyncSeek,
R: AsyncSeek,
{
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<Result<()>> {
delegate_call!(self.start_seek(cx, position))
}

fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
delegate_call!(self.poll_complete(cx))
}
}

impl<L, R> AsyncWrite for Either<L, R>
where
L: AsyncWrite,
R: AsyncWrite,
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
delegate_call!(self.poll_write(cx, buf))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
delegate_call!(self.poll_flush(cx))
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
delegate_call!(self.poll_shutdown(cx))
}
}

impl<L, R> futures_core::stream::Stream for Either<L, R>
where
L: futures_core::stream::Stream,
R: futures_core::stream::Stream<Item = L::Item>,
{
type Item = L::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
delegate_call!(self.poll_next(cx))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::{
io::{repeat, AsyncReadExt, Repeat},
stream::{once, Once, StreamExt},
};

#[tokio::test]
async fn either_is_stream() {
let mut either: Either<Once<u32>, Once<u32>> = Either::Left(once(1));

assert_eq!(Some(1u32), either.next().await);
}

#[tokio::test]
async fn either_is_async_read() {
let mut buffer = [0; 3];
let mut either: Either<Repeat, Repeat> = Either::Right(repeat(0b101));

either.read_exact(&mut buffer).await.unwrap();
assert_eq!(buffer, [0b101, 0b101, 0b101]);
}
}
13 changes: 13 additions & 0 deletions tokio-util/src/io/mod.rs
@@ -0,0 +1,13 @@
//! Helpers for IO related tasks.
//!
//! These types are often used in combination with hyper or reqwest, as they
//! allow converting between a hyper [`Body`] and [`AsyncRead`].
//!
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead

mod reader_stream;
mod stream_reader;

pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;

0 comments on commit 3fc62b2

Please sign in to comment.